diff --git a/go.mod b/go.mod index 6c92171..f55412d 100644 --- a/go.mod +++ b/go.mod @@ -20,7 +20,6 @@ require ( github.com/gorilla/handlers v1.5.2 github.com/gorilla/mux v1.8.1 github.com/gorilla/sessions v1.4.0 - github.com/influxdata/influxdb-client-go/v2 v2.14.0 github.com/jmoiron/sqlx v1.4.0 github.com/joho/godotenv v1.5.1 github.com/mattn/go-sqlite3 v1.14.24 @@ -42,7 +41,6 @@ require ( github.com/Azure/go-ntlmssp v0.0.0-20221128193559-754e69321358 // indirect github.com/KyleBanks/depth v1.2.1 // indirect github.com/agnivade/levenshtein v1.2.1 // indirect - github.com/apapsch/go-jsonmerge/v2 v2.0.0 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/cpuguy83/go-md2man/v2 v2.0.6 // indirect @@ -60,7 +58,6 @@ require ( github.com/hashicorp/errwrap v1.1.0 // indirect github.com/hashicorp/go-multierror v1.1.1 // indirect github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect - github.com/influxdata/line-protocol v0.0.0-20210922203350-b1ad95c89adf // indirect github.com/jonboulle/clockwork v0.5.0 // indirect github.com/josharian/intern v1.0.0 // indirect github.com/jpillora/backoff v1.0.0 // indirect @@ -72,7 +69,6 @@ require ( github.com/modern-go/reflect2 v1.0.2 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f // indirect - github.com/oapi-codegen/runtime v1.1.1 // indirect github.com/prometheus/client_model v0.6.1 // indirect github.com/prometheus/procfs v0.15.1 // indirect github.com/robfig/cron/v3 v3.0.1 // indirect diff --git a/go.sum b/go.sum index b4c3781..a935407 100644 --- a/go.sum +++ b/go.sum @@ -16,7 +16,6 @@ github.com/Microsoft/go-winio v0.6.2 h1:F2VQgta7ecxGYO8k3ZZz3RS8fVIXVxONVUPlNERo github.com/Microsoft/go-winio v0.6.2/go.mod h1:yd8OoFMLzJbo9gZq8j5qaps8bJ9aShtEA8Ipt1oGCvU= github.com/PuerkitoBio/goquery v1.9.3 h1:mpJr/ikUA9/GNJB/DBZcGeFDXUtosHRyRrwh7KGdTG0= github.com/PuerkitoBio/goquery v1.9.3/go.mod h1:1ndLHPdTz+DyQPICCWYlYQMPl0oXZj0G6D4LCYA6u4U= -github.com/RaveNoX/go-jsoncommentstrip v1.0.0/go.mod h1:78ihd09MekBnJnxpICcwzCMzGrKSKYe4AqU6PDYYpjk= github.com/agnivade/levenshtein v1.2.1 h1:EHBY3UOn1gwdy/VbFwgo4cxecRznFk7fKWN1KOX7eoM= github.com/agnivade/levenshtein v1.2.1/go.mod h1:QVVI16kDrtSuwcpd0p1+xMC6Z/VfhtCyDIjcwga4/DU= github.com/alexbrainman/sspi v0.0.0-20231016080023-1a75b4708caa h1:LHTHcTQiSGT7VVbI0o4wBRNQIgn917usHWOd6VAffYI= @@ -25,13 +24,10 @@ github.com/andreyvit/diff v0.0.0-20170406064948-c7f18ee00883 h1:bvNMNQO63//z+xNg github.com/andreyvit/diff v0.0.0-20170406064948-c7f18ee00883/go.mod h1:rCTlJbsFo29Kk6CurOXKm700vrz8f0KW0JNfpkRJY/8= github.com/andybalholm/cascadia v1.3.2 h1:3Xi6Dw5lHF15JtdcmAHD3i1+T8plmv7BQ/nsViSLyss= github.com/andybalholm/cascadia v1.3.2/go.mod h1:7gtRlve5FxPPgIgX36uWBX58OdBsSS6lUvCFb+h7KvU= -github.com/apapsch/go-jsonmerge/v2 v2.0.0 h1:axGnT1gRIfimI7gJifB699GoE/oq+F2MU7Dml6nw9rQ= -github.com/apapsch/go-jsonmerge/v2 v2.0.0/go.mod h1:lvDnEdqiQrp0O42VQGgmlKpxL1AP2+08jFMw88y4klk= github.com/arbovm/levenshtein v0.0.0-20160628152529-48b4e1c0c4d0 h1:jfIu9sQUG6Ig+0+Ap1h4unLjW6YQJpKZVmUzxsD4E/Q= github.com/arbovm/levenshtein v0.0.0-20160628152529-48b4e1c0c4d0/go.mod h1:t2tdKJDJF9BV14lnkjHmOQgcvEKgtqs5a1N3LNdJhGE= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= -github.com/bmatcuk/doublestar v1.1.1/go.mod h1:UD6OnuiIn0yFxxA2le/rnRU1G4RaI4UvFv1sNto9p6w= github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/coreos/go-oidc/v3 v3.12.0 h1:sJk+8G2qq94rDI6ehZ71Bol3oUHy63qNYmkiSjrc/Jo= @@ -123,10 +119,6 @@ github.com/hashicorp/go-uuid v1.0.3 h1:2gKiV6YVmrJ1i2CKKa9obLvRieoRGviZFL26PcT/C github.com/hashicorp/go-uuid v1.0.3/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k= github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM= -github.com/influxdata/influxdb-client-go/v2 v2.14.0 h1:AjbBfJuq+QoaXNcrova8smSjwJdUHnwvfjMF71M1iI4= -github.com/influxdata/influxdb-client-go/v2 v2.14.0/go.mod h1:Ahpm3QXKMJslpXl3IftVLVezreAUtBOTZssDrjZEFHI= -github.com/influxdata/line-protocol v0.0.0-20210922203350-b1ad95c89adf h1:7JTmneyiNEwVBOHSjoMxiWAqB992atOeepeFYegn5RU= -github.com/influxdata/line-protocol v0.0.0-20210922203350-b1ad95c89adf/go.mod h1:xaLFMmpvUxqXtVkUJfg9QmT88cDaCJ3ZKgdZ78oO8Qo= github.com/jcmturner/aescts/v2 v2.0.0 h1:9YKLH6ey7H4eDBXW8khjYslgyqG2xZikXP0EQFKrle8= github.com/jcmturner/aescts/v2 v2.0.0/go.mod h1:AiaICIRyfYg35RUkr8yESTqvSy7csK90qZ5xfvvsoNs= github.com/jcmturner/dnsutils/v2 v2.0.0 h1:lltnkeZGL0wILNvrNiVCR6Ro5PGU/SeBvVO/8c/iPbo= @@ -151,7 +143,6 @@ github.com/jpillora/backoff v1.0.0 h1:uvFg412JmmHBHw7iwprIxkPMI+sGQ4kzOWsMeHnm2E github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4= github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= -github.com/juju/gnuflag v0.0.0-20171113085948-2ce1bb71843d/go.mod h1:2PavIy+JPciBPrBUjwbNvtwB6RQlve+hkpll6QSNmOE= github.com/klauspost/compress v1.17.11 h1:In6xLpyWOi1+C7tXUUWv2ot1QvBjxevKAaI6IXrJmUc= github.com/klauspost/compress v1.17.11/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= @@ -186,8 +177,6 @@ github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f h1:KUppIJq7/+SVif2QVs3tOP0zanoHgBEVAwHxUSIzRqU= github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= -github.com/oapi-codegen/runtime v1.1.1 h1:EXLHh0DXIJnWhdRPN2w4MXAzFyE4CskzhNLUmtpMYro= -github.com/oapi-codegen/runtime v1.1.1/go.mod h1:SK9X900oXmPWilYR5/WKPzt3Kqxn/uS/+lbpREv+eCg= github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U= github.com/opencontainers/go-digest v1.0.0/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3IKzErnv2BNG4W4MAM= github.com/opencontainers/image-spec v1.1.0 h1:8SG7/vwALn54lVB/0yZ/MMwhFrPYtpEHQb2IpWsCzug= @@ -219,7 +208,6 @@ github.com/sergi/go-diff v1.3.1 h1:xkr+Oxo4BOQKmkn/B9eMK0g5Kg/983T9DqqPHwYqD+8= github.com/sergi/go-diff v1.3.1/go.mod h1:aMJSSKb2lpPvRNec0+w3fl7LP9IOFzdc9Pa4NFbPK1I= github.com/sosodev/duration v1.3.1 h1:qtHBDMQ6lvMQsL15g4aopM4HEfOaYuhWBw3NPTtlqq4= github.com/sosodev/duration v1.3.1/go.mod h1:RQIBBX0+fMLc/D9+Jb/fwvVmo0eZvDDEERAikUR6SDg= -github.com/spkg/bom v0.0.0-20160624110644-59b7046e48ad/go.mod h1:qLr4V1qq6nMqFKkMo8ZTx3f+BZEkzsRUY10Xsm2mwU0= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= diff --git a/internal/api/api_test.go b/internal/api/api_test.go index 3af37ad..a938cb6 100644 --- a/internal/api/api_test.go +++ b/internal/api/api_test.go @@ -278,7 +278,7 @@ func TestRestApi(t *testing.T) { job.MonitoringStatus != 1 || job.SMT != 1 || !reflect.DeepEqual(job.Resources, []*schema.Resource{{Hostname: "host123", HWThreads: []int{0, 1, 2, 3, 4, 5, 6, 7}}}) || - job.StartTime.Unix() != 123456789 { + job.StartTime != 123456789 { t.Fatalf("unexpected job properties: %#v", job) } diff --git a/internal/api/rest.go b/internal/api/rest.go index fe35942..31a5979 100644 --- a/internal/api/rest.go +++ b/internal/api/rest.go @@ -150,9 +150,9 @@ type DeleteJobApiRequest struct { // GetJobsApiResponse model type GetJobsApiResponse struct { - Jobs []*schema.JobMeta `json:"jobs"` // Array of jobs - Items int `json:"items"` // Number of jobs returned - Page int `json:"page"` // Page id returned + Jobs []*schema.Job `json:"jobs"` // Array of jobs + Items int `json:"items"` // Number of jobs returned + Page int `json:"page"` // Page id returned } // GetClustersApiResponse model @@ -361,7 +361,7 @@ func (api *RestApi) getJobs(rw http.ResponseWriter, r *http.Request) { return } - results := make([]*schema.JobMeta, 0, len(jobs)) + results := make([]*schema.Job, 0, len(jobs)) for _, job := range jobs { if withMetadata { if _, err = api.JobRepository.FetchMetadata(job); err != nil { @@ -370,27 +370,21 @@ func (api *RestApi) getJobs(rw http.ResponseWriter, r *http.Request) { } } - res := &schema.JobMeta{ - ID: &job.ID, - BaseJob: job.BaseJob, - StartTime: job.StartTime.Unix(), - } - - res.Tags, err = api.JobRepository.GetTags(repository.GetUserFromContext(r.Context()), &job.ID) + job.Tags, err = api.JobRepository.GetTags(repository.GetUserFromContext(r.Context()), job.ID) if err != nil { handleError(err, http.StatusInternalServerError, rw) return } - if res.MonitoringStatus == schema.MonitoringStatusArchivingSuccessful { - res.Statistics, err = archive.GetStatistics(job) + if job.MonitoringStatus == schema.MonitoringStatusArchivingSuccessful { + job.Statistics, err = archive.GetStatistics(job) if err != nil { handleError(err, http.StatusInternalServerError, rw) return } } - results = append(results, res) + results = append(results, job) } log.Debugf("/api/jobs: %d jobs returned", len(results)) @@ -449,7 +443,7 @@ func (api *RestApi) getCompleteJobById(rw http.ResponseWriter, r *http.Request) return } - job.Tags, err = api.JobRepository.GetTags(repository.GetUserFromContext(r.Context()), &job.ID) + job.Tags, err = api.JobRepository.GetTags(repository.GetUserFromContext(r.Context()), job.ID) if err != nil { handleError(err, http.StatusInternalServerError, rw) return @@ -542,7 +536,7 @@ func (api *RestApi) getJobById(rw http.ResponseWriter, r *http.Request) { return } - job.Tags, err = api.JobRepository.GetTags(repository.GetUserFromContext(r.Context()), &job.ID) + job.Tags, err = api.JobRepository.GetTags(repository.GetUserFromContext(r.Context()), job.ID) if err != nil { handleError(err, http.StatusInternalServerError, rw) return @@ -683,7 +677,7 @@ func (api *RestApi) tagJob(rw http.ResponseWriter, r *http.Request) { return } - job.Tags, err = api.JobRepository.GetTags(repository.GetUserFromContext(r.Context()), &job.ID) + job.Tags, err = api.JobRepository.GetTags(repository.GetUserFromContext(r.Context()), job.ID) if err != nil { http.Error(rw, err.Error(), http.StatusInternalServerError) return @@ -696,7 +690,7 @@ func (api *RestApi) tagJob(rw http.ResponseWriter, r *http.Request) { } for _, tag := range req { - tagId, err := api.JobRepository.AddTagOrCreate(repository.GetUserFromContext(r.Context()), job.ID, tag.Type, tag.Name, tag.Scope) + tagId, err := api.JobRepository.AddTagOrCreate(repository.GetUserFromContext(r.Context()), *job.ID, tag.Type, tag.Name, tag.Scope) if err != nil { http.Error(rw, err.Error(), http.StatusInternalServerError) return @@ -745,7 +739,7 @@ func (api *RestApi) removeTagJob(rw http.ResponseWriter, r *http.Request) { return } - job.Tags, err = api.JobRepository.GetTags(repository.GetUserFromContext(r.Context()), &job.ID) + job.Tags, err = api.JobRepository.GetTags(repository.GetUserFromContext(r.Context()), job.ID) if err != nil { http.Error(rw, err.Error(), http.StatusInternalServerError) return @@ -764,7 +758,7 @@ func (api *RestApi) removeTagJob(rw http.ResponseWriter, r *http.Request) { continue } - remainingTags, err := api.JobRepository.RemoveJobTagByRequest(repository.GetUserFromContext(r.Context()), job.ID, rtag.Type, rtag.Name, rtag.Scope) + remainingTags, err := api.JobRepository.RemoveJobTagByRequest(repository.GetUserFromContext(r.Context()), *job.ID, rtag.Type, rtag.Name, rtag.Scope) if err != nil { http.Error(rw, err.Error(), http.StatusInternalServerError) return @@ -840,7 +834,10 @@ func (api *RestApi) removeTags(rw http.ResponseWriter, r *http.Request) { // @security ApiKeyAuth // @router /api/jobs/start_job/ [post] func (api *RestApi) startJob(rw http.ResponseWriter, r *http.Request) { - req := schema.JobMeta{BaseJob: schema.JobDefaults} + req := schema.Job{ + Exclusive: 1, + MonitoringStatus: schema.MonitoringStatusRunningOrArchiving, + } if err := decode(r.Body, &req); err != nil { handleError(fmt.Errorf("parsing request body failed: %w", err), http.StatusBadRequest, rw) return @@ -849,7 +846,7 @@ func (api *RestApi) startJob(rw http.ResponseWriter, r *http.Request) { log.Printf("REST: %s\n", req.GoString()) req.State = schema.JobStateRunning - if err := importer.SanityChecks(&req.BaseJob); err != nil { + if err := importer.SanityChecks(&req); err != nil { handleError(err, http.StatusBadRequest, rw) return } @@ -866,7 +863,7 @@ func (api *RestApi) startJob(rw http.ResponseWriter, r *http.Request) { return } else if err == nil { for _, job := range jobs { - if (req.StartTime - job.StartTimeUnix) < 86400 { + if (req.StartTime - job.StartTime) < 86400 { handleError(fmt.Errorf("a job with that jobId, cluster and startTime already exists: dbid: %d, jobid: %d", job.ID, job.JobID), http.StatusUnprocessableEntity, rw) return } @@ -1023,7 +1020,7 @@ func (api *RestApi) deleteJobByRequest(rw http.ResponseWriter, r *http.Request) return } - err = api.JobRepository.DeleteJobById(job.ID) + err = api.JobRepository.DeleteJobById(*job.ID) if err != nil { handleError(fmt.Errorf("deleting job failed: %w", err), http.StatusUnprocessableEntity, rw) return @@ -1087,8 +1084,8 @@ func (api *RestApi) checkAndHandleStopJob(rw http.ResponseWriter, job *schema.Jo return } - if job == nil || job.StartTime.Unix() > req.StopTime { - handleError(fmt.Errorf("jobId %d (id %d) on %s : stopTime %d must be larger/equal than startTime %d", job.JobID, job.ID, job.Cluster, req.StopTime, job.StartTime.Unix()), http.StatusBadRequest, rw) + if job == nil || job.StartTime > req.StopTime { + handleError(fmt.Errorf("jobId %d (id %d) on %s : stopTime %d must be larger/equal than startTime %d", job.JobID, job.ID, job.Cluster, req.StopTime, job.StartTime), http.StatusBadRequest, rw) return } @@ -1100,11 +1097,11 @@ func (api *RestApi) checkAndHandleStopJob(rw http.ResponseWriter, job *schema.Jo } // Mark job as stopped in the database (update state and duration) - job.Duration = int32(req.StopTime - job.StartTime.Unix()) + job.Duration = int32(req.StopTime - job.StartTime) job.State = req.State api.JobRepository.Mutex.Lock() - if err := api.JobRepository.Stop(job.ID, job.Duration, job.State, job.MonitoringStatus); err != nil { - if err := api.JobRepository.StopCached(job.ID, job.Duration, job.State, job.MonitoringStatus); err != nil { + if err := api.JobRepository.Stop(*job.ID, job.Duration, job.State, job.MonitoringStatus); err != nil { + if err := api.JobRepository.StopCached(*job.ID, job.Duration, job.State, job.MonitoringStatus); err != nil { api.JobRepository.Mutex.Unlock() handleError(fmt.Errorf("jobId %d (id %d) on %s : marking job as '%s' (duration: %d) in DB failed: %w", job.JobID, job.ID, job.Cluster, job.State, job.Duration, err), http.StatusInternalServerError, rw) return @@ -1112,7 +1109,7 @@ func (api *RestApi) checkAndHandleStopJob(rw http.ResponseWriter, job *schema.Jo } api.JobRepository.Mutex.Unlock() - log.Printf("archiving job... (dbid: %d): cluster=%s, jobId=%d, user=%s, startTime=%s, duration=%d, state=%s", job.ID, job.Cluster, job.JobID, job.User, job.StartTime, job.Duration, job.State) + log.Printf("archiving job... (dbid: %d): cluster=%s, jobId=%d, user=%s, startTime=%d, duration=%d, state=%s", job.ID, job.Cluster, job.JobID, job.User, job.StartTime, job.Duration, job.State) // Send a response (with status OK). This means that erros that happen from here on forward // can *NOT* be communicated to the client. If reading from a MetricDataRepository or diff --git a/internal/archiver/archiveWorker.go b/internal/archiver/archiveWorker.go index 42a60b9..e9f3dc9 100644 --- a/internal/archiver/archiveWorker.go +++ b/internal/archiver/archiveWorker.go @@ -41,7 +41,7 @@ func archivingWorker() { // will fail if job meta not in repository if _, err := jobRepo.FetchMetadata(job); err != nil { log.Errorf("archiving job (dbid: %d) failed at check metadata step: %s", job.ID, err.Error()) - jobRepo.UpdateMonitoringStatus(job.ID, schema.MonitoringStatusArchivingFailed) + jobRepo.UpdateMonitoringStatus(*job.ID, schema.MonitoringStatusArchivingFailed) continue } @@ -50,7 +50,7 @@ func archivingWorker() { jobMeta, err := ArchiveJob(job, context.Background()) if err != nil { log.Errorf("archiving job (dbid: %d) failed at archiving job step: %s", job.ID, err.Error()) - jobRepo.UpdateMonitoringStatus(job.ID, schema.MonitoringStatusArchivingFailed) + jobRepo.UpdateMonitoringStatus(*job.ID, schema.MonitoringStatusArchivingFailed) continue } diff --git a/internal/archiver/archiver.go b/internal/archiver/archiver.go index 1050ca1..b220d3b 100644 --- a/internal/archiver/archiver.go +++ b/internal/archiver/archiver.go @@ -16,7 +16,7 @@ import ( ) // Writes a running job to the job-archive -func ArchiveJob(job *schema.Job, ctx context.Context) (*schema.JobMeta, error) { +func ArchiveJob(job *schema.Job, ctx context.Context) (*schema.Job, error) { allMetrics := make([]string, 0) metricConfigs := archive.GetCluster(job.Cluster).MetricConfig for _, mc := range metricConfigs { @@ -40,11 +40,7 @@ func ArchiveJob(job *schema.Job, ctx context.Context) (*schema.JobMeta, error) { return nil, err } - jobMeta := &schema.JobMeta{ - BaseJob: job.BaseJob, - StartTime: job.StartTime.Unix(), - Statistics: make(map[string]schema.JobStatistics), - } + job.Statistics = make(map[string]schema.JobStatistics) for metric, data := range jobData { avg, min, max := 0.0, math.MaxFloat32, -math.MaxFloat32 @@ -61,7 +57,7 @@ func ArchiveJob(job *schema.Job, ctx context.Context) (*schema.JobMeta, error) { } // Round AVG Result to 2 Digits - jobMeta.Statistics[metric] = schema.JobStatistics{ + job.Statistics[metric] = schema.JobStatistics{ Unit: schema.Unit{ Prefix: archive.GetMetricConfig(job.Cluster, metric).Unit.Prefix, Base: archive.GetMetricConfig(job.Cluster, metric).Unit.Base, @@ -76,8 +72,8 @@ func ArchiveJob(job *schema.Job, ctx context.Context) (*schema.JobMeta, error) { // only return the JobMeta structure as the // statistics in there are needed. if config.Keys.DisableArchive { - return jobMeta, nil + return job, nil } - return jobMeta, archive.GetHandle().ImportJob(jobMeta, &jobData) + return job, archive.GetHandle().ImportJob(job, &jobData) } diff --git a/internal/graph/generated/generated.go b/internal/graph/generated/generated.go index e73bcf1..60e3ca0 100644 --- a/internal/graph/generated/generated.go +++ b/internal/graph/generated/generated.go @@ -398,6 +398,8 @@ type ClusterResolver interface { Partitions(ctx context.Context, obj *schema.Cluster) ([]string, error) } type JobResolver interface { + StartTime(ctx context.Context, obj *schema.Job) (*time.Time, error) + Tags(ctx context.Context, obj *schema.Job) ([]*schema.Tag, error) ConcurrentJobs(ctx context.Context, obj *schema.Job) (*model.JobLinkResultList, error) @@ -5456,9 +5458,9 @@ func (ec *executionContext) _Job_id(ctx context.Context, field graphql.Collected } return graphql.Null } - res := resTmp.(int64) + res := resTmp.(*int64) fc.Result = res - return ec.marshalNID2int64(ctx, field.Selections, res) + return ec.marshalNID2ᚖint64(ctx, field.Selections, res) } func (ec *executionContext) fieldContext_Job_id(_ context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) { @@ -5708,7 +5710,7 @@ func (ec *executionContext) _Job_startTime(ctx context.Context, field graphql.Co }() resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (any, error) { ctx = rctx // use context from middleware stack in children - return obj.StartTime, nil + return ec.resolvers.Job().StartTime(rctx, obj) }) if err != nil { ec.Error(ctx, err) @@ -5720,17 +5722,17 @@ func (ec *executionContext) _Job_startTime(ctx context.Context, field graphql.Co } return graphql.Null } - res := resTmp.(time.Time) + res := resTmp.(*time.Time) fc.Result = res - return ec.marshalNTime2timeᚐTime(ctx, field.Selections, res) + return ec.marshalNTime2ᚖtimeᚐTime(ctx, field.Selections, res) } func (ec *executionContext) fieldContext_Job_startTime(_ context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) { fc = &graphql.FieldContext{ Object: "Job", Field: field, - IsMethod: false, - IsResolver: false, + IsMethod: true, + IsResolver: true, Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) { return nil, errors.New("field of type Time does not have child fields") }, @@ -17424,10 +17426,41 @@ func (ec *executionContext) _Job(ctx context.Context, sel ast.SelectionSet, obj atomic.AddUint32(&out.Invalids, 1) } case "startTime": - out.Values[i] = ec._Job_startTime(ctx, field, obj) - if out.Values[i] == graphql.Null { - atomic.AddUint32(&out.Invalids, 1) + field := field + + innerFunc := func(ctx context.Context, fs *graphql.FieldSet) (res graphql.Marshaler) { + defer func() { + if r := recover(); r != nil { + ec.Error(ctx, ec.Recover(ctx, r)) + } + }() + res = ec._Job_startTime(ctx, field, obj) + if res == graphql.Null { + atomic.AddUint32(&fs.Invalids, 1) + } + return res } + + if field.Deferrable != nil { + dfs, ok := deferred[field.Deferrable.Label] + di := 0 + if ok { + dfs.AddField(field) + di = len(dfs.Values) - 1 + } else { + dfs = graphql.NewFieldSet([]graphql.CollectedField{field}) + deferred[field.Deferrable.Label] = dfs + } + dfs.Concurrently(di, func(ctx context.Context) graphql.Marshaler { + return innerFunc(ctx, dfs) + }) + + // don't run the out.Concurrently() call below + out.Values[i] = graphql.Null + continue + } + + out.Concurrently(i, func(ctx context.Context) graphql.Marshaler { return innerFunc(ctx, out) }) case "duration": out.Values[i] = ec._Job_duration(ctx, field, obj) if out.Values[i] == graphql.Null { @@ -20580,6 +20613,27 @@ func (ec *executionContext) marshalNID2ᚕstringᚄ(ctx context.Context, sel ast return ret } +func (ec *executionContext) unmarshalNID2ᚖint64(ctx context.Context, v any) (*int64, error) { + res, err := graphql.UnmarshalInt64(v) + return &res, graphql.ErrorOnPath(ctx, err) +} + +func (ec *executionContext) marshalNID2ᚖint64(ctx context.Context, sel ast.SelectionSet, v *int64) graphql.Marshaler { + if v == nil { + if !graphql.HasFieldError(ctx, graphql.GetFieldContext(ctx)) { + ec.Errorf(ctx, "the requested element is null which the schema does not allow") + } + return graphql.Null + } + res := graphql.MarshalInt64(*v) + if res == graphql.Null { + if !graphql.HasFieldError(ctx, graphql.GetFieldContext(ctx)) { + ec.Errorf(ctx, "the requested element is null which the schema does not allow") + } + } + return res +} + func (ec *executionContext) unmarshalNInt2int(ctx context.Context, v any) (int, error) { res, err := graphql.UnmarshalInt(v) return res, graphql.ErrorOnPath(ctx, err) @@ -21799,6 +21853,27 @@ func (ec *executionContext) marshalNTime2timeᚐTime(ctx context.Context, sel as return res } +func (ec *executionContext) unmarshalNTime2ᚖtimeᚐTime(ctx context.Context, v any) (*time.Time, error) { + res, err := graphql.UnmarshalTime(v) + return &res, graphql.ErrorOnPath(ctx, err) +} + +func (ec *executionContext) marshalNTime2ᚖtimeᚐTime(ctx context.Context, sel ast.SelectionSet, v *time.Time) graphql.Marshaler { + if v == nil { + if !graphql.HasFieldError(ctx, graphql.GetFieldContext(ctx)) { + ec.Errorf(ctx, "the requested element is null which the schema does not allow") + } + return graphql.Null + } + res := graphql.MarshalTime(*v) + if res == graphql.Null { + if !graphql.HasFieldError(ctx, graphql.GetFieldContext(ctx)) { + ec.Errorf(ctx, "the requested element is null which the schema does not allow") + } + } + return res +} + func (ec *executionContext) marshalNTimeWeights2ᚖgithubᚗcomᚋClusterCockpitᚋccᚑbackendᚋinternalᚋgraphᚋmodelᚐTimeWeights(ctx context.Context, sel ast.SelectionSet, v *model.TimeWeights) graphql.Marshaler { if v == nil { if !graphql.HasFieldError(ctx, graphql.GetFieldContext(ctx)) { diff --git a/internal/graph/schema.resolvers.go b/internal/graph/schema.resolvers.go index 7e52b3d..6b790a5 100644 --- a/internal/graph/schema.resolvers.go +++ b/internal/graph/schema.resolvers.go @@ -29,9 +29,14 @@ func (r *clusterResolver) Partitions(ctx context.Context, obj *schema.Cluster) ( return r.Repo.Partitions(obj.Name) } +// StartTime is the resolver for the startTime field. +func (r *jobResolver) StartTime(ctx context.Context, obj *schema.Job) (*time.Time, error) { + panic(fmt.Errorf("not implemented: StartTime - startTime")) +} + // Tags is the resolver for the tags field. func (r *jobResolver) Tags(ctx context.Context, obj *schema.Job) ([]*schema.Tag, error) { - return r.Repo.GetTags(repository.GetUserFromContext(ctx), &obj.ID) + return r.Repo.GetTags(repository.GetUserFromContext(ctx), obj.ID) } // ConcurrentJobs is the resolver for the concurrentJobs field. @@ -615,9 +620,9 @@ func (r *queryResolver) JobsMetricStats(ctx context.Context, filter []*model.Job numThreadsInt := int(job.NumHWThreads) numAccsInt := int(job.NumAcc) res = append(res, &model.JobStats{ - ID: int(job.ID), + ID: int(*job.ID), JobID: strconv.Itoa(int(job.JobID)), - StartTime: int(job.StartTime.Unix()), + StartTime: int(job.StartTime), Duration: int(job.Duration), Cluster: job.Cluster, SubCluster: job.SubCluster, @@ -776,11 +781,9 @@ func (r *Resolver) Query() generated.QueryResolver { return &queryResolver{r} } // SubCluster returns generated.SubClusterResolver implementation. func (r *Resolver) SubCluster() generated.SubClusterResolver { return &subClusterResolver{r} } -type ( - clusterResolver struct{ *Resolver } - jobResolver struct{ *Resolver } - metricValueResolver struct{ *Resolver } - mutationResolver struct{ *Resolver } - queryResolver struct{ *Resolver } - subClusterResolver struct{ *Resolver } -) +type clusterResolver struct{ *Resolver } +type jobResolver struct{ *Resolver } +type metricValueResolver struct{ *Resolver } +type mutationResolver struct{ *Resolver } +type queryResolver struct{ *Resolver } +type subClusterResolver struct{ *Resolver } diff --git a/internal/importer/handleImport.go b/internal/importer/handleImport.go index 623291c..83230f5 100644 --- a/internal/importer/handleImport.go +++ b/internal/importer/handleImport.go @@ -42,7 +42,10 @@ func HandleImportFlag(flag string) error { } dec := json.NewDecoder(bytes.NewReader(raw)) dec.DisallowUnknownFields() - job := schema.JobMeta{BaseJob: schema.JobDefaults} + job := schema.Job{ + Exclusive: 1, + MonitoringStatus: schema.MonitoringStatusRunningOrArchiving, + } if err = dec.Decode(&job); err != nil { log.Warn("Error while decoding raw json metadata for import") return err @@ -141,7 +144,7 @@ func HandleImportFlag(flag string) error { return err } - if err = SanityChecks(&job.BaseJob); err != nil { + if err = SanityChecks(&job); err != nil { log.Warn("BaseJob SanityChecks failed") return err } diff --git a/internal/importer/initDB.go b/internal/importer/initDB.go index 9a2ccdf..1239951 100644 --- a/internal/importer/initDB.go +++ b/internal/importer/initDB.go @@ -60,11 +60,6 @@ func InitDB() error { } jobMeta.MonitoringStatus = schema.MonitoringStatusArchivingSuccessful - job := schema.Job{ - BaseJob: jobMeta.BaseJob, - StartTime: time.Unix(jobMeta.StartTime, 0), - StartTimeUnix: jobMeta.StartTime, - } sc, err := archive.GetSubCluster(jobMeta.Cluster, jobMeta.SubCluster) if err != nil { @@ -72,7 +67,7 @@ func InitDB() error { return err } - job.Footprint = make(map[string]float64) + jobMeta.Footprint = make(map[string]float64) for _, fp := range sc.Footprint { statType := "avg" @@ -83,16 +78,16 @@ func InitDB() error { name := fmt.Sprintf("%s_%s", fp, statType) - job.Footprint[name] = repository.LoadJobStat(jobMeta, fp, statType) + jobMeta.Footprint[name] = repository.LoadJobStat(jobMeta, fp, statType) } - job.RawFootprint, err = json.Marshal(job.Footprint) + jobMeta.RawFootprint, err = json.Marshal(jobMeta.Footprint) if err != nil { log.Warn("Error while marshaling job footprint") return err } - job.EnergyFootprint = make(map[string]float64) + jobMeta.EnergyFootprint = make(map[string]float64) // Total Job Energy Outside Loop totalEnergy := 0.0 @@ -117,45 +112,45 @@ func InitDB() error { log.Warnf("Error while collecting energy metric %s for job, DB ID '%v', return '0.0'", fp, jobMeta.ID) } - job.EnergyFootprint[fp] = metricEnergy + jobMeta.EnergyFootprint[fp] = metricEnergy totalEnergy += metricEnergy } - job.Energy = (math.Round(totalEnergy*100.0) / 100.0) - if job.RawEnergyFootprint, err = json.Marshal(job.EnergyFootprint); err != nil { + jobMeta.Energy = (math.Round(totalEnergy*100.0) / 100.0) + if jobMeta.RawEnergyFootprint, err = json.Marshal(jobMeta.EnergyFootprint); err != nil { log.Warnf("Error while marshaling energy footprint for job INTO BYTES, DB ID '%v'", jobMeta.ID) return err } - job.RawResources, err = json.Marshal(job.Resources) + jobMeta.RawResources, err = json.Marshal(jobMeta.Resources) if err != nil { log.Errorf("repository initDB(): %v", err) errorOccured++ continue } - job.RawMetaData, err = json.Marshal(job.MetaData) + jobMeta.RawMetaData, err = json.Marshal(jobMeta.MetaData) if err != nil { log.Errorf("repository initDB(): %v", err) errorOccured++ continue } - if err := SanityChecks(&job.BaseJob); err != nil { + if err := SanityChecks(jobMeta); err != nil { log.Errorf("repository initDB(): %v", err) errorOccured++ continue } id, err := r.TransactionAddNamed(t, - repository.NamedJobInsert, job) + repository.NamedJobInsert, jobMeta) if err != nil { log.Errorf("repository initDB(): %v", err) errorOccured++ continue } - for _, tag := range job.Tags { + for _, tag := range jobMeta.Tags { tagstr := tag.Name + ":" + tag.Type tagId, ok := tags[tagstr] if !ok { @@ -190,7 +185,7 @@ func InitDB() error { } // This function also sets the subcluster if necessary! -func SanityChecks(job *schema.BaseJob) error { +func SanityChecks(job *schema.Job) error { if c := archive.GetCluster(job.Cluster); c == nil { return fmt.Errorf("no such cluster: %v", job.Cluster) } diff --git a/internal/metricdata/cc-metric-store.go b/internal/metricdata/cc-metric-store.go index 7c84d93..557e1d2 100644 --- a/internal/metricdata/cc-metric-store.go +++ b/internal/metricdata/cc-metric-store.go @@ -183,8 +183,8 @@ func (ccms *CCMetricStore) LoadData( req := ApiQueryRequest{ Cluster: job.Cluster, - From: job.StartTime.Unix(), - To: job.StartTime.Add(time.Duration(job.Duration) * time.Second).Unix(), + From: job.StartTime, + To: job.StartTime + int64(job.Duration), Queries: queries, WithStats: true, WithData: true, @@ -570,7 +570,6 @@ func (ccms *CCMetricStore) LoadStats( metrics []string, ctx context.Context, ) (map[string]map[string]schema.MetricStatistics, error) { - queries, _, err := ccms.buildQueries(job, metrics, []schema.MetricScope{schema.MetricScopeNode}, 0) // #166 Add scope shere for analysis view accelerator normalization? if err != nil { log.Errorf("Error while building queries for jobId %d, Metrics %v: %s", job.JobID, metrics, err.Error()) @@ -579,8 +578,8 @@ func (ccms *CCMetricStore) LoadStats( req := ApiQueryRequest{ Cluster: job.Cluster, - From: job.StartTime.Unix(), - To: job.StartTime.Add(time.Duration(job.Duration) * time.Second).Unix(), + From: job.StartTime, + To: job.StartTime + int64(job.Duration), Queries: queries, WithStats: true, WithData: false, @@ -638,8 +637,8 @@ func (ccms *CCMetricStore) LoadScopedStats( req := ApiQueryRequest{ Cluster: job.Cluster, - From: job.StartTime.Unix(), - To: job.StartTime.Add(time.Duration(job.Duration) * time.Second).Unix(), + From: job.StartTime, + To: job.StartTime + int64(job.Duration), Queries: queries, WithStats: true, WithData: false, @@ -816,7 +815,6 @@ func (ccms *CCMetricStore) LoadNodeListData( page *model.PageRequest, ctx context.Context, ) (map[string]schema.JobData, int, bool, error) { - // 0) Init additional vars var totalNodes int = 0 var hasNextPage bool = false @@ -975,7 +973,6 @@ func (ccms *CCMetricStore) buildNodeQueries( scopes []schema.MetricScope, resolution int, ) ([]ApiQuery, []schema.MetricScope, error) { - queries := make([]ApiQuery, 0, len(metrics)*len(scopes)*len(nodes)) assignedScope := []schema.MetricScope{} diff --git a/internal/metricdata/influxdb-v2.go b/internal/metricdata/influxdb-v2.go deleted file mode 100644 index c53dad3..0000000 --- a/internal/metricdata/influxdb-v2.go +++ /dev/null @@ -1,575 +0,0 @@ -// Copyright (C) NHR@FAU, University Erlangen-Nuremberg. -// All rights reserved. -// Use of this source code is governed by a MIT-style -// license that can be found in the LICENSE file. -package metricdata - -import ( - "context" - "crypto/tls" - "encoding/json" - "errors" - "fmt" - "math" - "sort" - "strings" - "time" - - "github.com/ClusterCockpit/cc-backend/internal/graph/model" - "github.com/ClusterCockpit/cc-backend/pkg/archive" - "github.com/ClusterCockpit/cc-backend/pkg/log" - "github.com/ClusterCockpit/cc-backend/pkg/schema" - influxdb2 "github.com/influxdata/influxdb-client-go/v2" - influxdb2Api "github.com/influxdata/influxdb-client-go/v2/api" -) - -type InfluxDBv2DataRepositoryConfig struct { - Url string `json:"url"` - Token string `json:"token"` - Bucket string `json:"bucket"` - Org string `json:"org"` - SkipTls bool `json:"skiptls"` -} - -type InfluxDBv2DataRepository struct { - client influxdb2.Client - queryClient influxdb2Api.QueryAPI - bucket, measurement string -} - -func (idb *InfluxDBv2DataRepository) Init(rawConfig json.RawMessage) error { - var config InfluxDBv2DataRepositoryConfig - if err := json.Unmarshal(rawConfig, &config); err != nil { - log.Warn("Error while unmarshaling raw json config") - return err - } - - idb.client = influxdb2.NewClientWithOptions(config.Url, config.Token, influxdb2.DefaultOptions().SetTLSConfig(&tls.Config{InsecureSkipVerify: config.SkipTls})) - idb.queryClient = idb.client.QueryAPI(config.Org) - idb.bucket = config.Bucket - - return nil -} - -func (idb *InfluxDBv2DataRepository) formatTime(t time.Time) string { - return t.Format(time.RFC3339) // Like “2006-01-02T15:04:05Z07:00” -} - -func (idb *InfluxDBv2DataRepository) epochToTime(epoch int64) time.Time { - return time.Unix(epoch, 0) -} - -func (idb *InfluxDBv2DataRepository) LoadData( - job *schema.Job, - metrics []string, - scopes []schema.MetricScope, - ctx context.Context, - resolution int) (schema.JobData, error) { - - log.Infof("InfluxDB 2 Backend: Resolution Scaling not Implemented, will return default timestep. Requested Resolution %d", resolution) - - measurementsConds := make([]string, 0, len(metrics)) - for _, m := range metrics { - measurementsConds = append(measurementsConds, fmt.Sprintf(`r["_measurement"] == "%s"`, m)) - } - measurementsCond := strings.Join(measurementsConds, " or ") - - hostsConds := make([]string, 0, len(job.Resources)) - for _, h := range job.Resources { - if h.HWThreads != nil || h.Accelerators != nil { - // TODO - return nil, errors.New("METRICDATA/INFLUXV2 > the InfluxDB metric data repository does not yet support HWThreads or Accelerators") - } - hostsConds = append(hostsConds, fmt.Sprintf(`r["hostname"] == "%s"`, h.Hostname)) - } - hostsCond := strings.Join(hostsConds, " or ") - - jobData := make(schema.JobData) // Empty Schema: map[FIELD]map[SCOPE]<*JobMetric>METRIC - // Requested Scopes - for _, scope := range scopes { - query := "" - switch scope { - case "node": - // Get Finest Granularity, Groupy By Measurement and Hostname (== Metric / Node), Calculate Mean for 60s windows <-- Resolution could be added here? - // log.Info("Scope 'node' requested. ") - query = fmt.Sprintf(` - from(bucket: "%s") - |> range(start: %s, stop: %s) - |> filter(fn: (r) => (%s) and (%s) ) - |> drop(columns: ["_start", "_stop"]) - |> group(columns: ["hostname", "_measurement"]) - |> aggregateWindow(every: 60s, fn: mean) - |> drop(columns: ["_time"])`, - idb.bucket, - idb.formatTime(job.StartTime), idb.formatTime(idb.epochToTime(job.StartTimeUnix+int64(job.Duration)+int64(1))), - measurementsCond, hostsCond) - case "socket": - log.Info("Scope 'socket' requested, but not yet supported: Will return 'node' scope only. ") - continue - case "core": - log.Info(" Scope 'core' requested, but not yet supported: Will return 'node' scope only. ") - continue - // Get Finest Granularity only, Set NULL to 0.0 - // query = fmt.Sprintf(` - // from(bucket: "%s") - // |> range(start: %s, stop: %s) - // |> filter(fn: (r) => %s ) - // |> filter(fn: (r) => %s ) - // |> drop(columns: ["_start", "_stop", "cluster"]) - // |> map(fn: (r) => (if exists r._value then {r with _value: r._value} else {r with _value: 0.0}))`, - // idb.bucket, - // idb.formatTime(job.StartTime), idb.formatTime(idb.epochToTime(job.StartTimeUnix + int64(job.Duration) + int64(1) )), - // measurementsCond, hostsCond) - case "hwthread": - log.Info(" Scope 'hwthread' requested, but not yet supported: Will return 'node' scope only. ") - continue - case "accelerator": - log.Info(" Scope 'accelerator' requested, but not yet supported: Will return 'node' scope only. ") - continue - default: - log.Infof("Unknown scope '%s' requested: Will return 'node' scope.", scope) - continue - // return nil, errors.New("METRICDATA/INFLUXV2 > the InfluxDB metric data repository does not yet support other scopes than 'node'") - } - - rows, err := idb.queryClient.Query(ctx, query) - if err != nil { - log.Error("Error while performing query") - return nil, err - } - - // Init Metrics: Only Node level now -> TODO: Matching /check on scope level ... - for _, metric := range metrics { - jobMetric, ok := jobData[metric] - if !ok { - mc := archive.GetMetricConfig(job.Cluster, metric) - jobMetric = map[schema.MetricScope]*schema.JobMetric{ - scope: { // uses scope var from above! - Unit: mc.Unit, - Timestep: mc.Timestep, - Series: make([]schema.Series, 0, len(job.Resources)), - StatisticsSeries: nil, // Should be: &schema.StatsSeries{}, - }, - } - } - jobData[metric] = jobMetric - } - - // Process Result: Time-Data - field, host, hostSeries := "", "", schema.Series{} - // typeId := 0 - switch scope { - case "node": - for rows.Next() { - row := rows.Record() - if host == "" || host != row.ValueByKey("hostname").(string) || rows.TableChanged() { - if host != "" { - // Append Series before reset - jobData[field][scope].Series = append(jobData[field][scope].Series, hostSeries) - } - field, host = row.Measurement(), row.ValueByKey("hostname").(string) - hostSeries = schema.Series{ - Hostname: host, - Statistics: schema.MetricStatistics{}, //TODO Add Statistics - Data: make([]schema.Float, 0), - } - } - val, ok := row.Value().(float64) - if ok { - hostSeries.Data = append(hostSeries.Data, schema.Float(val)) - } else { - hostSeries.Data = append(hostSeries.Data, schema.Float(0)) - } - } - case "socket": - continue - case "accelerator": - continue - case "hwthread": - // See below @ core - continue - case "core": - continue - // Include Series.Id in hostSeries - // for rows.Next() { - // row := rows.Record() - // if ( host == "" || host != row.ValueByKey("hostname").(string) || typeId != row.ValueByKey("type-id").(int) || rows.TableChanged() ) { - // if ( host != "" ) { - // // Append Series before reset - // jobData[field][scope].Series = append(jobData[field][scope].Series, hostSeries) - // } - // field, host, typeId = row.Measurement(), row.ValueByKey("hostname").(string), row.ValueByKey("type-id").(int) - // hostSeries = schema.Series{ - // Hostname: host, - // Id: &typeId, - // Statistics: nil, - // Data: make([]schema.Float, 0), - // } - // } - // val := row.Value().(float64) - // hostSeries.Data = append(hostSeries.Data, schema.Float(val)) - // } - default: - log.Infof("Unknown scope '%s' requested: Will return 'node' scope.", scope) - continue - // return nil, errors.New("the InfluxDB metric data repository does not yet support other scopes than 'node, core'") - } - // Append last Series - jobData[field][scope].Series = append(jobData[field][scope].Series, hostSeries) - } - - // Get Stats - stats, err := idb.LoadStats(job, metrics, ctx) - if err != nil { - log.Warn("Error while loading statistics") - return nil, err - } - - for _, scope := range scopes { - if scope == "node" { // No 'socket/core' support yet - for metric, nodes := range stats { - for node, stats := range nodes { - for index, _ := range jobData[metric][scope].Series { - if jobData[metric][scope].Series[index].Hostname == node { - jobData[metric][scope].Series[index].Statistics = schema.MetricStatistics{Avg: stats.Avg, Min: stats.Min, Max: stats.Max} - } - } - } - } - } - } - - return jobData, nil -} - -func (idb *InfluxDBv2DataRepository) LoadStats( - job *schema.Job, - metrics []string, - ctx context.Context) (map[string]map[string]schema.MetricStatistics, error) { - - stats := map[string]map[string]schema.MetricStatistics{} - - hostsConds := make([]string, 0, len(job.Resources)) - for _, h := range job.Resources { - if h.HWThreads != nil || h.Accelerators != nil { - // TODO - return nil, errors.New("METRICDATA/INFLUXV2 > the InfluxDB metric data repository does not yet support HWThreads or Accelerators") - } - hostsConds = append(hostsConds, fmt.Sprintf(`r["hostname"] == "%s"`, h.Hostname)) - } - hostsCond := strings.Join(hostsConds, " or ") - - // lenMet := len(metrics) - - for _, metric := range metrics { - // log.Debugf("<< You are here: %s (Index %d of %d metrics)", metric, index, lenMet) - - query := fmt.Sprintf(` - data = from(bucket: "%s") - |> range(start: %s, stop: %s) - |> filter(fn: (r) => r._measurement == "%s" and r._field == "value" and (%s)) - union(tables: [data |> mean(column: "_value") |> set(key: "_field", value: "avg"), - data |> min(column: "_value") |> set(key: "_field", value: "min"), - data |> max(column: "_value") |> set(key: "_field", value: "max")]) - |> pivot(rowKey: ["hostname"], columnKey: ["_field"], valueColumn: "_value") - |> group()`, - idb.bucket, - idb.formatTime(job.StartTime), idb.formatTime(idb.epochToTime(job.StartTimeUnix+int64(job.Duration)+int64(1))), - metric, hostsCond) - - rows, err := idb.queryClient.Query(ctx, query) - if err != nil { - log.Error("Error while performing query") - return nil, err - } - - nodes := map[string]schema.MetricStatistics{} - for rows.Next() { - row := rows.Record() - host := row.ValueByKey("hostname").(string) - - avg, avgok := row.ValueByKey("avg").(float64) - if !avgok { - // log.Debugf(">> Assertion error for metric %s, statistic AVG. Expected 'float64', got %v", metric, avg) - avg = 0.0 - } - min, minok := row.ValueByKey("min").(float64) - if !minok { - // log.Debugf(">> Assertion error for metric %s, statistic MIN. Expected 'float64', got %v", metric, min) - min = 0.0 - } - max, maxok := row.ValueByKey("max").(float64) - if !maxok { - // log.Debugf(">> Assertion error for metric %s, statistic MAX. Expected 'float64', got %v", metric, max) - max = 0.0 - } - - nodes[host] = schema.MetricStatistics{ - Avg: avg, - Min: min, - Max: max, - } - } - stats[metric] = nodes - } - - return stats, nil -} - -// Used in Job-View StatsTable -// UNTESTED -func (idb *InfluxDBv2DataRepository) LoadScopedStats( - job *schema.Job, - metrics []string, - scopes []schema.MetricScope, - ctx context.Context) (schema.ScopedJobStats, error) { - - // Assumption: idb.loadData() only returns series node-scope - use node scope for statsTable - scopedJobStats := make(schema.ScopedJobStats) - data, err := idb.LoadData(job, metrics, []schema.MetricScope{schema.MetricScopeNode}, ctx, 0 /*resolution here*/) - if err != nil { - log.Warn("Error while loading job for scopedJobStats") - return nil, err - } - - for metric, metricData := range data { - for _, scope := range scopes { - if scope != schema.MetricScopeNode { - logOnce.Do(func() { - log.Infof("Note: Scope '%s' requested, but not yet supported: Will return 'node' scope only.", scope) - }) - continue - } - - if _, ok := scopedJobStats[metric]; !ok { - scopedJobStats[metric] = make(map[schema.MetricScope][]*schema.ScopedStats) - } - - if _, ok := scopedJobStats[metric][scope]; !ok { - scopedJobStats[metric][scope] = make([]*schema.ScopedStats, 0) - } - - for _, series := range metricData[scope].Series { - scopedJobStats[metric][scope] = append(scopedJobStats[metric][scope], &schema.ScopedStats{ - Hostname: series.Hostname, - Data: &series.Statistics, - }) - } - } - } - - return scopedJobStats, nil -} - -// Used in Systems-View @ Node-Overview -// UNTESTED -func (idb *InfluxDBv2DataRepository) LoadNodeData( - cluster string, - metrics, nodes []string, - scopes []schema.MetricScope, - from, to time.Time, - ctx context.Context) (map[string]map[string][]*schema.JobMetric, error) { - - // Note: scopes[] Array will be ignored, only return node scope - - // CONVERT ARGS TO INFLUX - measurementsConds := make([]string, 0) - for _, m := range metrics { - measurementsConds = append(measurementsConds, fmt.Sprintf(`r["_measurement"] == "%s"`, m)) - } - measurementsCond := strings.Join(measurementsConds, " or ") - - hostsConds := make([]string, 0) - if nodes == nil { - var allNodes []string - subClusterNodeLists := archive.NodeLists[cluster] - for _, nodeList := range subClusterNodeLists { - allNodes = append(nodes, nodeList.PrintList()...) - } - for _, node := range allNodes { - nodes = append(nodes, node) - hostsConds = append(hostsConds, fmt.Sprintf(`r["hostname"] == "%s"`, node)) - } - } else { - for _, node := range nodes { - hostsConds = append(hostsConds, fmt.Sprintf(`r["hostname"] == "%s"`, node)) - } - } - hostsCond := strings.Join(hostsConds, " or ") - - // BUILD AND PERFORM QUERY - query := fmt.Sprintf(` - from(bucket: "%s") - |> range(start: %s, stop: %s) - |> filter(fn: (r) => (%s) and (%s) ) - |> drop(columns: ["_start", "_stop"]) - |> group(columns: ["hostname", "_measurement"]) - |> aggregateWindow(every: 60s, fn: mean) - |> drop(columns: ["_time"])`, - idb.bucket, - idb.formatTime(from), idb.formatTime(to), - measurementsCond, hostsCond) - - rows, err := idb.queryClient.Query(ctx, query) - if err != nil { - log.Error("Error while performing query") - return nil, err - } - - // HANDLE QUERY RETURN - // Collect Float Arrays for Node@Metric -> No Scope Handling! - influxData := make(map[string]map[string][]schema.Float) - for rows.Next() { - row := rows.Record() - host, field := row.ValueByKey("hostname").(string), row.Measurement() - - influxHostData, ok := influxData[host] - if !ok { - influxHostData = make(map[string][]schema.Float) - influxData[host] = influxHostData - } - - influxFieldData, ok := influxData[host][field] - if !ok { - influxFieldData = make([]schema.Float, 0) - influxData[host][field] = influxFieldData - } - - val, ok := row.Value().(float64) - if ok { - influxData[host][field] = append(influxData[host][field], schema.Float(val)) - } else { - influxData[host][field] = append(influxData[host][field], schema.Float(0)) - } - } - - // BUILD FUNCTION RETURN - data := make(map[string]map[string][]*schema.JobMetric) - for node, metricData := range influxData { - - nodeData, ok := data[node] - if !ok { - nodeData = make(map[string][]*schema.JobMetric) - data[node] = nodeData - } - - for metric, floatArray := range metricData { - avg, min, max := 0.0, 0.0, 0.0 - for _, val := range floatArray { - avg += float64(val) - min = math.Min(min, float64(val)) - max = math.Max(max, float64(val)) - } - - stats := schema.MetricStatistics{ - Avg: (math.Round((avg/float64(len(floatArray)))*100) / 100), - Min: (math.Round(min*100) / 100), - Max: (math.Round(max*100) / 100), - } - - mc := archive.GetMetricConfig(cluster, metric) - nodeData[metric] = append(nodeData[metric], &schema.JobMetric{ - Unit: mc.Unit, - Timestep: mc.Timestep, - Series: []schema.Series{ - { - Hostname: node, - Statistics: stats, - Data: floatArray, - }, - }, - }) - } - } - - return data, nil -} - -// Used in Systems-View @ Node-List -// UNTESTED -func (idb *InfluxDBv2DataRepository) LoadNodeListData( - cluster, subCluster, nodeFilter string, - metrics []string, - scopes []schema.MetricScope, - resolution int, - from, to time.Time, - page *model.PageRequest, - ctx context.Context, -) (map[string]schema.JobData, int, bool, error) { - - // Assumption: idb.loadData() only returns series node-scope - use node scope for NodeList - - // 0) Init additional vars - var totalNodes int = 0 - var hasNextPage bool = false - - // 1) Get list of all nodes - var nodes []string - if subCluster != "" { - scNodes := archive.NodeLists[cluster][subCluster] - nodes = scNodes.PrintList() - } else { - subClusterNodeLists := archive.NodeLists[cluster] - for _, nodeList := range subClusterNodeLists { - nodes = append(nodes, nodeList.PrintList()...) - } - } - - // 2) Filter nodes - if nodeFilter != "" { - filteredNodes := []string{} - for _, node := range nodes { - if strings.Contains(node, nodeFilter) { - filteredNodes = append(filteredNodes, node) - } - } - nodes = filteredNodes - } - - // 2.1) Count total nodes && Sort nodes -> Sorting invalidated after return ... - totalNodes = len(nodes) - sort.Strings(nodes) - - // 3) Apply paging - if len(nodes) > page.ItemsPerPage { - start := (page.Page - 1) * page.ItemsPerPage - end := start + page.ItemsPerPage - if end > len(nodes) { - end = len(nodes) - hasNextPage = false - } else { - hasNextPage = true - } - nodes = nodes[start:end] - } - - // 4) Fetch And Convert Data, use idb.LoadNodeData() for query - - rawNodeData, err := idb.LoadNodeData(cluster, metrics, nodes, scopes, from, to, ctx) - if err != nil { - log.Error(fmt.Sprintf("Error while loading influx nodeData for nodeListData %#v\n", err)) - return nil, totalNodes, hasNextPage, err - } - - data := make(map[string]schema.JobData) - for node, nodeData := range rawNodeData { - // Init Nested Map Data Structures If Not Found - hostData, ok := data[node] - if !ok { - hostData = make(schema.JobData) - data[node] = hostData - } - - for metric, nodeMetricData := range nodeData { - metricData, ok := hostData[metric] - if !ok { - metricData = make(map[schema.MetricScope]*schema.JobMetric) - data[node][metric] = metricData - } - - data[node][metric][schema.MetricScopeNode] = nodeMetricData[0] // Only Node Scope Returned from loadNodeData - } - } - - return data, totalNodes, hasNextPage, nil -} diff --git a/internal/metricdata/metricdata.go b/internal/metricdata/metricdata.go index f30d837..e6b739a 100644 --- a/internal/metricdata/metricdata.go +++ b/internal/metricdata/metricdata.go @@ -54,8 +54,6 @@ func Init() error { switch kind.Kind { case "cc-metric-store": mdr = &CCMetricStore{} - case "influxdb": - mdr = &InfluxDBv2DataRepository{} case "prometheus": mdr = &PrometheusDataRepository{} case "test": diff --git a/internal/metricdata/prometheus.go b/internal/metricdata/prometheus.go index d16501e..fa49764 100644 --- a/internal/metricdata/prometheus.go +++ b/internal/metricdata/prometheus.go @@ -279,8 +279,8 @@ func (pdb *PrometheusDataRepository) LoadData( for i, resource := range job.Resources { nodes[i] = resource.Hostname } - from := job.StartTime - to := job.StartTime.Add(time.Duration(job.Duration) * time.Second) + from := time.Unix(job.StartTime, 0) + to := time.Unix(job.StartTime+int64(job.Duration), 0) for _, scope := range scopes { if scope != schema.MetricScopeNode { @@ -453,8 +453,8 @@ func (pdb *PrometheusDataRepository) LoadScopedStats( job *schema.Job, metrics []string, scopes []schema.MetricScope, - ctx context.Context) (schema.ScopedJobStats, error) { - + ctx context.Context, +) (schema.ScopedJobStats, error) { // Assumption: pdb.loadData() only returns series node-scope - use node scope for statsTable scopedJobStats := make(schema.ScopedJobStats) data, err := pdb.LoadData(job, metrics, []schema.MetricScope{schema.MetricScopeNode}, ctx, 0 /*resolution here*/) @@ -502,7 +502,6 @@ func (pdb *PrometheusDataRepository) LoadNodeListData( page *model.PageRequest, ctx context.Context, ) (map[string]schema.JobData, int, bool, error) { - // Assumption: pdb.loadData() only returns series node-scope - use node scope for NodeList // 0) Init additional vars diff --git a/internal/repository/job.go b/internal/repository/job.go index 3702099..c800141 100644 --- a/internal/repository/job.go +++ b/internal/repository/job.go @@ -73,7 +73,7 @@ func scanJob(row interface{ Scan(...any) error }) (*schema.Job, error) { if err := row.Scan( &job.ID, &job.JobID, &job.User, &job.Project, &job.Cluster, &job.SubCluster, - &job.StartTimeUnix, &job.Partition, &job.ArrayJobId, &job.NumNodes, &job.NumHWThreads, + &job.StartTime, &job.Partition, &job.ArrayJobId, &job.NumNodes, &job.NumHWThreads, &job.NumAcc, &job.Exclusive, &job.MonitoringStatus, &job.SMT, &job.State, &job.Duration, &job.Walltime, &job.RawResources, &job.RawFootprint, &job.Energy); err != nil { log.Warnf("Error while scanning rows (Job): %v", err) @@ -92,10 +92,9 @@ func scanJob(row interface{ Scan(...any) error }) (*schema.Job, error) { } job.RawFootprint = nil - job.StartTime = time.Unix(job.StartTimeUnix, 0) // Always ensure accurate duration for running jobs if job.State == schema.JobStateRunning { - job.Duration = int32(time.Since(job.StartTime).Seconds()) + job.Duration = int32(time.Now().Unix() - job.StartTime) } return job, nil @@ -582,7 +581,7 @@ func (r *JobRepository) MarkArchived( func (r *JobRepository) UpdateEnergy( stmt sq.UpdateBuilder, - jobMeta *schema.JobMeta, + jobMeta *schema.Job, ) (sq.UpdateBuilder, error) { /* Note: Only Called for Running Jobs during Intermediate Update or on Archiving */ sc, err := archive.GetSubCluster(jobMeta.Cluster, jobMeta.SubCluster) @@ -632,7 +631,7 @@ func (r *JobRepository) UpdateEnergy( func (r *JobRepository) UpdateFootprint( stmt sq.UpdateBuilder, - jobMeta *schema.JobMeta, + jobMeta *schema.Job, ) (sq.UpdateBuilder, error) { /* Note: Only Called for Running Jobs during Intermediate Update or on Archiving */ sc, err := archive.GetSubCluster(jobMeta.Cluster, jobMeta.SubCluster) diff --git a/internal/repository/jobCreate.go b/internal/repository/jobCreate.go index f286c68..1508c8d 100644 --- a/internal/repository/jobCreate.go +++ b/internal/repository/jobCreate.go @@ -29,7 +29,7 @@ const NamedJobInsert string = `INSERT INTO job ( :exclusive, :monitoring_status, :smt, :job_state, :start_time, :duration, :walltime, :footprint, :energy, :energy_footprint, :resources, :meta_data );` -func (r *JobRepository) InsertJob(job *schema.JobMeta) (int64, error) { +func (r *JobRepository) InsertJob(job *schema.Job) (int64, error) { r.Mutex.Lock() res, err := r.DB.NamedExec(NamedJobCacheInsert, job) r.Mutex.Unlock() @@ -87,7 +87,7 @@ func (r *JobRepository) SyncJobs() ([]*schema.Job, error) { // Start inserts a new job in the table, returning the unique job ID. // Statistics are not transfered! -func (r *JobRepository) Start(job *schema.JobMeta) (id int64, err error) { +func (r *JobRepository) Start(job *schema.Job) (id int64, err error) { job.RawFootprint, err = json.Marshal(job.Footprint) if err != nil { return -1, fmt.Errorf("REPOSITORY/JOB > encoding footprint field failed: %w", err) diff --git a/internal/repository/jobFind.go b/internal/repository/jobFind.go index b820084..2acdb87 100644 --- a/internal/repository/jobFind.go +++ b/internal/repository/jobFind.go @@ -227,7 +227,7 @@ func (r *JobRepository) FindConcurrentJobs( var startTime int64 var stopTime int64 - startTime = job.StartTimeUnix + startTime = job.StartTime hostname := job.Resources[0].Hostname if job.State == schema.JobStateRunning { diff --git a/internal/repository/job_test.go b/internal/repository/job_test.go index 363bb6c..bf7abd9 100644 --- a/internal/repository/job_test.go +++ b/internal/repository/job_test.go @@ -24,7 +24,7 @@ func TestFind(t *testing.T) { // fmt.Printf("%+v", job) - if job.ID != 5 { + if *job.ID != 5 { t.Errorf("wrong summary for diagnostic 3\ngot: %d \nwant: 1366", job.JobID) } } diff --git a/internal/repository/stats.go b/internal/repository/stats.go index 410ba6c..7a5078f 100644 --- a/internal/repository/stats.go +++ b/internal/repository/stats.go @@ -291,7 +291,7 @@ func (r *JobRepository) JobsStats( return stats, nil } -func LoadJobStat(job *schema.JobMeta, metric string, statType string) float64 { +func LoadJobStat(job *schema.Job, metric string, statType string) float64 { if stats, ok := job.Statistics[metric]; ok { switch statType { case "avg": @@ -759,7 +759,6 @@ func (r *JobRepository) runningJobsMetricStatisticsHistogram( filters []*model.JobFilter, bins *int, ) []*model.MetricHistoPoints { - // Get Jobs jobs, err := r.QueryJobs(ctx, filters, &model.PageRequest{Page: 1, ItemsPerPage: 500 + 1}, nil) if err != nil { diff --git a/internal/repository/testdata/job.db b/internal/repository/testdata/job.db index 43ec9d3..c65dfd0 100644 Binary files a/internal/repository/testdata/job.db and b/internal/repository/testdata/job.db differ diff --git a/internal/tagger/classifyJob.go b/internal/tagger/classifyJob.go index 6fd3fae..0af7096 100644 --- a/internal/tagger/classifyJob.go +++ b/internal/tagger/classifyJob.go @@ -301,7 +301,7 @@ func (t *JobClassTagger) Match(job *schema.Job) { } if match.(bool) { log.Info("Rule matches!") - id := job.ID + id := *job.ID if !r.HasTag(id, t.tagType, tag) { r.AddTagOrCreateDirect(id, t.tagType, tag) } diff --git a/internal/tagger/detectApp.go b/internal/tagger/detectApp.go index 7945b48..9e4bf29 100644 --- a/internal/tagger/detectApp.go +++ b/internal/tagger/detectApp.go @@ -105,7 +105,7 @@ func (t *AppTagger) Match(job *schema.Job) { jobscript, ok := metadata["jobScript"] if ok { - id := job.ID + id := *job.ID out: for _, a := range t.apps { diff --git a/internal/taskManager/updateFootprintService.go b/internal/taskManager/updateFootprintService.go index a220855..f417ad4 100644 --- a/internal/taskManager/updateFootprintService.go +++ b/internal/taskManager/updateFootprintService.go @@ -73,11 +73,7 @@ func RegisterFootprintWorker() { continue } - jobMeta := &schema.JobMeta{ - BaseJob: job.BaseJob, - StartTime: job.StartTime.Unix(), - Statistics: make(map[string]schema.JobStatistics), - } + job.Statistics = make(map[string]schema.JobStatistics) for _, metric := range allMetrics { avg, min, max := 0.0, 0.0, 0.0 @@ -95,7 +91,7 @@ func RegisterFootprintWorker() { } // Add values rounded to 2 digits: repo.LoadStats may return unrounded - jobMeta.Statistics[metric] = schema.JobStatistics{ + job.Statistics[metric] = schema.JobStatistics{ Unit: schema.Unit{ Prefix: archive.GetMetricConfig(job.Cluster, metric).Unit.Prefix, Base: archive.GetMetricConfig(job.Cluster, metric).Unit.Base, @@ -108,7 +104,7 @@ func RegisterFootprintWorker() { // Build Statement per Job, Add to Pending Array stmt := sq.Update("job") - stmt, err = jobRepo.UpdateFootprint(stmt, jobMeta) + stmt, err = jobRepo.UpdateFootprint(stmt, job) if err != nil { log.Errorf("update job (dbid: %d) statement build failed at footprint step: %s", job.ID, err.Error()) ce++ diff --git a/pkg/archive/archive.go b/pkg/archive/archive.go index cd457eb..c221e91 100644 --- a/pkg/archive/archive.go +++ b/pkg/archive/archive.go @@ -23,7 +23,7 @@ type ArchiveBackend interface { Exists(job *schema.Job) bool - LoadJobMeta(job *schema.Job) (*schema.JobMeta, error) + LoadJobMeta(job *schema.Job) (*schema.Job, error) LoadJobData(job *schema.Job) (schema.JobData, error) @@ -31,9 +31,9 @@ type ArchiveBackend interface { LoadClusterCfg(name string) (*schema.Cluster, error) - StoreJobMeta(jobMeta *schema.JobMeta) error + StoreJobMeta(jobMeta *schema.Job) error - ImportJob(jobMeta *schema.JobMeta, jobData *schema.JobData) error + ImportJob(jobMeta *schema.Job, jobData *schema.JobData) error GetClusters() []string @@ -51,7 +51,7 @@ type ArchiveBackend interface { } type JobContainer struct { - Meta *schema.JobMeta + Meta *schema.Job Data *schema.JobData } @@ -162,7 +162,6 @@ func LoadScopedStatsFromArchive( metrics []string, scopes []schema.MetricScope, ) (schema.ScopedJobStats, error) { - data, err := ar.LoadJobStats(job) if err != nil { log.Errorf("Error while loading job stats from archiveBackend: %s", err.Error()) diff --git a/pkg/archive/archive_test.go b/pkg/archive/archive_test.go index ac00ea1..ba53e38 100644 --- a/pkg/archive/archive_test.go +++ b/pkg/archive/archive_test.go @@ -9,7 +9,6 @@ import ( "fmt" "path/filepath" "testing" - "time" "github.com/ClusterCockpit/cc-backend/internal/util" "github.com/ClusterCockpit/cc-backend/pkg/archive" @@ -32,12 +31,12 @@ func setup(t *testing.T) archive.ArchiveBackend { jobs[0] = &schema.Job{} jobs[0].JobID = 1403244 jobs[0].Cluster = "emmy" - jobs[0].StartTime = time.Unix(1608923076, 0) + jobs[0].StartTime = 1608923076 jobs[1] = &schema.Job{} jobs[0].JobID = 1404397 jobs[0].Cluster = "emmy" - jobs[0].StartTime = time.Unix(1609300556, 0) + jobs[0].StartTime = 1609300556 return archive.GetHandle() } diff --git a/pkg/archive/clusterConfig.go b/pkg/archive/clusterConfig.go index 95520a0..04d1349 100644 --- a/pkg/archive/clusterConfig.go +++ b/pkg/archive/clusterConfig.go @@ -223,7 +223,7 @@ func GetMetricConfig(cluster, metric string) *schema.MetricConfig { // AssignSubCluster sets the `job.subcluster` property of the job based // on its cluster and resources. -func AssignSubCluster(job *schema.BaseJob) error { +func AssignSubCluster(job *schema.Job) error { cluster := GetCluster(job.Cluster) if cluster == nil { return fmt.Errorf("ARCHIVE/CLUSTERCONFIG > unkown cluster: %v", job.Cluster) diff --git a/pkg/archive/fsBackend.go b/pkg/archive/fsBackend.go index a59b663..a90c092 100644 --- a/pkg/archive/fsBackend.go +++ b/pkg/archive/fsBackend.go @@ -53,7 +53,7 @@ func getDirectory( rootPath, job.Cluster, lvl1, lvl2, - strconv.FormatInt(job.StartTime.Unix(), 10)) + strconv.FormatInt(job.StartTime, 10)) } func getPath( @@ -65,15 +65,15 @@ func getPath( getDirectory(job, rootPath), file) } -func loadJobMeta(filename string) (*schema.JobMeta, error) { +func loadJobMeta(filename string) (*schema.Job, error) { b, err := os.ReadFile(filename) if err != nil { log.Errorf("loadJobMeta() > open file error: %v", err) - return &schema.JobMeta{}, err + return nil, err } if config.Keys.Validate { if err := schema.Validate(schema.Meta, bytes.NewReader(b)); err != nil { - return &schema.JobMeta{}, fmt.Errorf("validate job meta: %v", err) + return nil, fmt.Errorf("validate job meta: %v", err) } } @@ -429,7 +429,7 @@ func (fsa *FsArchive) LoadJobStats(job *schema.Job) (schema.ScopedJobStats, erro return loadJobStats(filename, isCompressed) } -func (fsa *FsArchive) LoadJobMeta(job *schema.Job) (*schema.JobMeta, error) { +func (fsa *FsArchive) LoadJobMeta(job *schema.Job) (*schema.Job, error) { filename := getPath(job, fsa.path, "meta.json") return loadJobMeta(filename) } @@ -518,18 +518,13 @@ func (fsa *FsArchive) Iter(loadMetricData bool) <-chan JobContainer { return ch } -func (fsa *FsArchive) StoreJobMeta(jobMeta *schema.JobMeta) error { - job := schema.Job{ - BaseJob: jobMeta.BaseJob, - StartTime: time.Unix(jobMeta.StartTime, 0), - StartTimeUnix: jobMeta.StartTime, - } - f, err := os.Create(getPath(&job, fsa.path, "meta.json")) +func (fsa *FsArchive) StoreJobMeta(job *schema.Job) error { + f, err := os.Create(getPath(job, fsa.path, "meta.json")) if err != nil { log.Error("Error while creating filepath for meta.json") return err } - if err := EncodeJobMeta(f, jobMeta); err != nil { + if err := EncodeJobMeta(f, job); err != nil { log.Error("Error while encoding job metadata to meta.json file") return err } @@ -546,15 +541,10 @@ func (fsa *FsArchive) GetClusters() []string { } func (fsa *FsArchive) ImportJob( - jobMeta *schema.JobMeta, + jobMeta *schema.Job, jobData *schema.JobData, ) error { - job := schema.Job{ - BaseJob: jobMeta.BaseJob, - StartTime: time.Unix(jobMeta.StartTime, 0), - StartTimeUnix: jobMeta.StartTime, - } - dir := getPath(&job, fsa.path, "") + dir := getPath(jobMeta, fsa.path, "") if err := os.MkdirAll(dir, 0777); err != nil { log.Error("Error while creating job archive path") return err diff --git a/pkg/archive/fsBackend_test.go b/pkg/archive/fsBackend_test.go index 9db68ed..ddb430a 100644 --- a/pkg/archive/fsBackend_test.go +++ b/pkg/archive/fsBackend_test.go @@ -9,7 +9,6 @@ import ( "fmt" "path/filepath" "testing" - "time" "github.com/ClusterCockpit/cc-backend/internal/util" "github.com/ClusterCockpit/cc-backend/pkg/schema" @@ -86,8 +85,11 @@ func TestLoadJobMeta(t *testing.T) { t.Fatal(err) } - jobIn := schema.Job{BaseJob: schema.JobDefaults} - jobIn.StartTime = time.Unix(1608923076, 0) + jobIn := schema.Job{ + Exclusive: 1, + MonitoringStatus: schema.MonitoringStatusRunningOrArchiving, + } + jobIn.StartTime = 1608923076 jobIn.JobID = 1403244 jobIn.Cluster = "emmy" @@ -114,8 +116,11 @@ func TestLoadJobData(t *testing.T) { t.Fatal(err) } - jobIn := schema.Job{BaseJob: schema.JobDefaults} - jobIn.StartTime = time.Unix(1608923076, 0) + jobIn := schema.Job{ + Exclusive: 1, + MonitoringStatus: schema.MonitoringStatusRunningOrArchiving, + } + jobIn.StartTime = 1608923076 jobIn.JobID = 1403244 jobIn.Cluster = "emmy" @@ -142,8 +147,11 @@ func BenchmarkLoadJobData(b *testing.B) { var fsa FsArchive fsa.Init(json.RawMessage(archiveCfg)) - jobIn := schema.Job{BaseJob: schema.JobDefaults} - jobIn.StartTime = time.Unix(1608923076, 0) + jobIn := schema.Job{ + Exclusive: 1, + MonitoringStatus: schema.MonitoringStatusRunningOrArchiving, + } + jobIn.StartTime = 1608923076 jobIn.JobID = 1403244 jobIn.Cluster = "emmy" @@ -165,8 +173,11 @@ func BenchmarkLoadJobDataCompressed(b *testing.B) { var fsa FsArchive fsa.Init(json.RawMessage(archiveCfg)) - jobIn := schema.Job{BaseJob: schema.JobDefaults} - jobIn.StartTime = time.Unix(1608923076, 0) + jobIn := schema.Job{ + Exclusive: 1, + MonitoringStatus: schema.MonitoringStatusRunningOrArchiving, + } + jobIn.StartTime = 1608923076 jobIn.JobID = 1403244 jobIn.Cluster = "emmy" diff --git a/pkg/archive/json.go b/pkg/archive/json.go index 5201b74..d3639f5 100644 --- a/pkg/archive/json.go +++ b/pkg/archive/json.go @@ -69,8 +69,8 @@ func DecodeJobStats(r io.Reader, k string) (schema.ScopedJobStats, error) { return nil, err } -func DecodeJobMeta(r io.Reader) (*schema.JobMeta, error) { - var d schema.JobMeta +func DecodeJobMeta(r io.Reader) (*schema.Job, error) { + var d schema.Job if err := json.NewDecoder(r).Decode(&d); err != nil { log.Warn("Error while decoding raw job meta json") return &d, err @@ -103,7 +103,7 @@ func EncodeJobData(w io.Writer, d *schema.JobData) error { return nil } -func EncodeJobMeta(w io.Writer, d *schema.JobMeta) error { +func EncodeJobMeta(w io.Writer, d *schema.Job) error { // Sanitize parameters if err := json.NewEncoder(w).Encode(d); err != nil { log.Warn("Error while encoding new job meta json") diff --git a/pkg/schema/job.go b/pkg/schema/job.go index 7475c36..ef1ecde 100644 --- a/pkg/schema/job.go +++ b/pkg/schema/job.go @@ -8,43 +8,8 @@ import ( "errors" "fmt" "io" - "time" ) -// BaseJob is the common part of the job metadata structs -// -// Common subset of Job and JobMeta. Use one of those, not this type directly. - -type BaseJob struct { - Cluster string `json:"cluster" db:"cluster" example:"fritz"` - SubCluster string `json:"subCluster" db:"subcluster" example:"main"` - Partition string `json:"partition,omitempty" db:"cluster_partition" example:"main"` - Project string `json:"project" db:"project" example:"abcd200"` - User string `json:"user" db:"hpc_user" example:"abcd100h"` - State JobState `json:"jobState" db:"job_state" example:"completed" enums:"completed,failed,cancelled,stopped,timeout,out_of_memory"` - Tags []*Tag `json:"tags,omitempty"` - RawEnergyFootprint []byte `json:"-" db:"energy_footprint"` - RawFootprint []byte `json:"-" db:"footprint"` - RawMetaData []byte `json:"-" db:"meta_data"` - RawResources []byte `json:"-" db:"resources"` - Resources []*Resource `json:"resources"` - EnergyFootprint map[string]float64 `json:"energyFootprint"` - Footprint map[string]float64 `json:"footprint"` - MetaData map[string]string `json:"metaData"` - ConcurrentJobs JobLinkResultList `json:"concurrentJobs"` - Energy float64 `json:"energy" db:"energy"` - ArrayJobId int64 `json:"arrayJobId,omitempty" db:"array_job_id" example:"123000"` - Walltime int64 `json:"walltime,omitempty" db:"walltime" example:"86400" minimum:"1"` - JobID int64 `json:"jobId" db:"job_id" example:"123000"` - Duration int32 `json:"duration" db:"duration" example:"43200" minimum:"1"` - SMT int32 `json:"smt,omitempty" db:"smt" example:"4"` - MonitoringStatus int32 `json:"monitoringStatus,omitempty" db:"monitoring_status" example:"1" minimum:"0" maximum:"3"` - Exclusive int32 `json:"exclusive" db:"exclusive" example:"1" minimum:"0" maximum:"2"` - NumAcc int32 `json:"numAcc,omitempty" db:"num_acc" example:"2" minimum:"1"` - NumHWThreads int32 `json:"numHwthreads,omitempty" db:"num_hwthreads" example:"20" minimum:"1"` - NumNodes int32 `json:"numNodes" db:"num_nodes" example:"2" minimum:"1"` -} - // Job struct type // // This type is used as the GraphQL interface and using sqlx as a table row. @@ -52,10 +17,36 @@ type BaseJob struct { // Job model // @Description Information of a HPC job. type Job struct { - StartTime time.Time `json:"startTime"` - BaseJob - ID int64 `json:"id" db:"id"` - StartTimeUnix int64 `json:"-" db:"start_time" example:"1649723812"` + Cluster string `json:"cluster" db:"cluster" example:"fritz"` + SubCluster string `json:"subCluster" db:"subcluster" example:"main"` + Partition string `json:"partition,omitempty" db:"cluster_partition" example:"main"` + Project string `json:"project" db:"project" example:"abcd200"` + User string `json:"user" db:"hpc_user" example:"abcd100h"` + State JobState `json:"jobState" db:"job_state" example:"completed" enums:"completed,failed,cancelled,stopped,timeout,out_of_memory"` + Tags []*Tag `json:"tags,omitempty"` + RawEnergyFootprint []byte `json:"-" db:"energy_footprint"` + RawFootprint []byte `json:"-" db:"footprint"` + RawMetaData []byte `json:"-" db:"meta_data"` + RawResources []byte `json:"-" db:"resources"` + Resources []*Resource `json:"resources"` + EnergyFootprint map[string]float64 `json:"energyFootprint"` + Footprint map[string]float64 `json:"footprint"` + MetaData map[string]string `json:"metaData"` + ConcurrentJobs JobLinkResultList `json:"concurrentJobs"` + Energy float64 `json:"energy" db:"energy"` + ArrayJobId int64 `json:"arrayJobId,omitempty" db:"array_job_id" example:"123000"` + Walltime int64 `json:"walltime,omitempty" db:"walltime" example:"86400" minimum:"1"` + JobID int64 `json:"jobId" db:"job_id" example:"123000"` + Duration int32 `json:"duration" db:"duration" example:"43200" minimum:"1"` + SMT int32 `json:"smt,omitempty" db:"smt" example:"4"` + MonitoringStatus int32 `json:"monitoringStatus,omitempty" db:"monitoring_status" example:"1" minimum:"0" maximum:"3"` + Exclusive int32 `json:"exclusive" db:"exclusive" example:"1" minimum:"0" maximum:"2"` + NumAcc int32 `json:"numAcc,omitempty" db:"num_acc" example:"2" minimum:"1"` + NumHWThreads int32 `json:"numHwthreads,omitempty" db:"num_hwthreads" example:"20" minimum:"1"` + NumNodes int32 `json:"numNodes" db:"num_nodes" example:"2" minimum:"1"` + Statistics map[string]JobStatistics `json:"statistics"` + ID *int64 `json:"id,omitempty" db:"id"` + StartTime int64 `json:"startTime" db:"start_time" example:"1649723812"` } // JobMeta struct type @@ -70,12 +61,12 @@ type Job struct { // // JobMeta model // @Description Meta data information of a HPC job. -type JobMeta struct { - ID *int64 `json:"id,omitempty"` - Statistics map[string]JobStatistics `json:"statistics"` - BaseJob - StartTime int64 `json:"startTime" db:"start_time" example:"1649723812" minimum:"1"` -} +// type JobMeta struct { +// ID *int64 `json:"id,omitempty"` +// BaseJob +// Statistics map[string]JobStatistics `json:"statistics"` +// StartTime int64 `json:"startTime" db:"start_time" example:"1649723812" minimum:"1"` +// } type JobLink struct { ID int64 `json:"id"` @@ -94,10 +85,10 @@ const ( MonitoringStatusArchivingSuccessful int32 = 3 ) -var JobDefaults BaseJob = BaseJob{ - Exclusive: 1, - MonitoringStatus: MonitoringStatusRunningOrArchiving, -} +// var JobDefaults Job = Job{ +// Exclusive: 1, +// MonitoringStatus: MonitoringStatusRunningOrArchiving, +// } type Unit struct { Base string `json:"base"` @@ -144,9 +135,9 @@ const ( JobStateOutOfMemory JobState = "out_of_memory" ) -func (j JobMeta) GoString() string { - return fmt.Sprintf("JobMeta{ID:%d, StartTime:%d, JobID:%v, BaseJob:%v}", - j.ID, j.StartTime, j.JobID, j.BaseJob) +func (j Job) GoString() string { + return fmt.Sprintf("Job{ID:%d, StartTime:%d, JobID:%v, BaseJob:%v}", + j.ID, j.StartTime, j.JobID, j) } func (e *JobState) UnmarshalGQL(v any) error {