mirror of
https://github.com/ClusterCockpit/cc-backend
synced 2024-11-13 02:17:25 +01:00
Merge branch 'dev' into add_nats_server
This commit is contained in:
commit
f61b5bb2a7
16
Makefile
16
Makefile
@ -24,9 +24,19 @@ SVELTE_COMPONENTS = status \
|
|||||||
SVELTE_TARGETS = $(addprefix $(FRONTEND)/public/build/,$(addsuffix .js, $(SVELTE_COMPONENTS)))
|
SVELTE_TARGETS = $(addprefix $(FRONTEND)/public/build/,$(addsuffix .js, $(SVELTE_COMPONENTS)))
|
||||||
SVELTE_SRC = $(wildcard $(FRONTEND)/src/*.svelte) \
|
SVELTE_SRC = $(wildcard $(FRONTEND)/src/*.svelte) \
|
||||||
$(wildcard $(FRONTEND)/src/*.js) \
|
$(wildcard $(FRONTEND)/src/*.js) \
|
||||||
$(wildcard $(FRONTEND)/src/filters/*.svelte) \
|
$(wildcard $(FRONTEND)/src/analysis/*.svelte) \
|
||||||
$(wildcard $(FRONTEND)/src/plots/*.svelte) \
|
$(wildcard $(FRONTEND)/src/config/*.svelte) \
|
||||||
$(wildcard $(FRONTEND)/src/joblist/*.svelte)
|
$(wildcard $(FRONTEND)/src/config/admin/*.svelte) \
|
||||||
|
$(wildcard $(FRONTEND)/src/config/user/*.svelte) \
|
||||||
|
$(wildcard $(FRONTEND)/src/generic/*.js) \
|
||||||
|
$(wildcard $(FRONTEND)/src/generic/*.svelte) \
|
||||||
|
$(wildcard $(FRONTEND)/src/generic/filters/*.svelte) \
|
||||||
|
$(wildcard $(FRONTEND)/src/generic/plots/*.svelte) \
|
||||||
|
$(wildcard $(FRONTEND)/src/generic/joblist/*.svelte) \
|
||||||
|
$(wildcard $(FRONTEND)/src/generic/helper/*.svelte) \
|
||||||
|
$(wildcard $(FRONTEND)/src/generic/select/*.svelte) \
|
||||||
|
$(wildcard $(FRONTEND)/src/header/*.svelte) \
|
||||||
|
$(wildcard $(FRONTEND)/src/job/*.svelte)
|
||||||
|
|
||||||
.PHONY: clean distclean test tags frontend swagger graphql $(TARGET)
|
.PHONY: clean distclean test tags frontend swagger graphql $(TARGET)
|
||||||
|
|
||||||
|
@ -182,7 +182,7 @@ func main() {
|
|||||||
log.Fatalf("failed to initialize archive: %s", err.Error())
|
log.Fatalf("failed to initialize archive: %s", err.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := metricdata.Init(config.Keys.DisableArchive); err != nil {
|
if err := metricdata.Init(); err != nil {
|
||||||
log.Fatalf("failed to initialize metricdata repository: %s", err.Error())
|
log.Fatalf("failed to initialize metricdata repository: %s", err.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -20,6 +20,7 @@ import (
|
|||||||
"github.com/99designs/gqlgen/graphql/handler"
|
"github.com/99designs/gqlgen/graphql/handler"
|
||||||
"github.com/99designs/gqlgen/graphql/playground"
|
"github.com/99designs/gqlgen/graphql/playground"
|
||||||
"github.com/ClusterCockpit/cc-backend/internal/api"
|
"github.com/ClusterCockpit/cc-backend/internal/api"
|
||||||
|
"github.com/ClusterCockpit/cc-backend/internal/archiver"
|
||||||
"github.com/ClusterCockpit/cc-backend/internal/auth"
|
"github.com/ClusterCockpit/cc-backend/internal/auth"
|
||||||
"github.com/ClusterCockpit/cc-backend/internal/config"
|
"github.com/ClusterCockpit/cc-backend/internal/config"
|
||||||
"github.com/ClusterCockpit/cc-backend/internal/graph"
|
"github.com/ClusterCockpit/cc-backend/internal/graph"
|
||||||
@ -38,6 +39,15 @@ var (
|
|||||||
apiHandle *api.RestApi
|
apiHandle *api.RestApi
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func onFailureResponse(rw http.ResponseWriter, r *http.Request, err error) {
|
||||||
|
rw.Header().Add("Content-Type", "application/json")
|
||||||
|
rw.WriteHeader(http.StatusUnauthorized)
|
||||||
|
json.NewEncoder(rw).Encode(map[string]string{
|
||||||
|
"status": http.StatusText(http.StatusUnauthorized),
|
||||||
|
"error": err.Error(),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
func serverInit() {
|
func serverInit() {
|
||||||
// Setup the http.Handler/Router used by the server
|
// Setup the http.Handler/Router used by the server
|
||||||
graph.Init()
|
graph.Init()
|
||||||
@ -166,64 +176,32 @@ func serverInit() {
|
|||||||
return authHandle.AuthApi(
|
return authHandle.AuthApi(
|
||||||
// On success;
|
// On success;
|
||||||
next,
|
next,
|
||||||
|
|
||||||
// On failure: JSON Response
|
// On failure: JSON Response
|
||||||
func(rw http.ResponseWriter, r *http.Request, err error) {
|
onFailureResponse)
|
||||||
rw.Header().Add("Content-Type", "application/json")
|
|
||||||
rw.WriteHeader(http.StatusUnauthorized)
|
|
||||||
json.NewEncoder(rw).Encode(map[string]string{
|
|
||||||
"status": http.StatusText(http.StatusUnauthorized),
|
|
||||||
"error": err.Error(),
|
|
||||||
})
|
|
||||||
})
|
|
||||||
})
|
})
|
||||||
|
|
||||||
userapi.Use(func(next http.Handler) http.Handler {
|
userapi.Use(func(next http.Handler) http.Handler {
|
||||||
return authHandle.AuthUserApi(
|
return authHandle.AuthUserApi(
|
||||||
// On success;
|
// On success;
|
||||||
next,
|
next,
|
||||||
|
|
||||||
// On failure: JSON Response
|
// On failure: JSON Response
|
||||||
func(rw http.ResponseWriter, r *http.Request, err error) {
|
onFailureResponse)
|
||||||
rw.Header().Add("Content-Type", "application/json")
|
|
||||||
rw.WriteHeader(http.StatusUnauthorized)
|
|
||||||
json.NewEncoder(rw).Encode(map[string]string{
|
|
||||||
"status": http.StatusText(http.StatusUnauthorized),
|
|
||||||
"error": err.Error(),
|
|
||||||
})
|
|
||||||
})
|
|
||||||
})
|
})
|
||||||
|
|
||||||
configapi.Use(func(next http.Handler) http.Handler {
|
configapi.Use(func(next http.Handler) http.Handler {
|
||||||
return authHandle.AuthConfigApi(
|
return authHandle.AuthConfigApi(
|
||||||
// On success;
|
// On success;
|
||||||
next,
|
next,
|
||||||
|
|
||||||
// On failure: JSON Response
|
// On failure: JSON Response
|
||||||
func(rw http.ResponseWriter, r *http.Request, err error) {
|
onFailureResponse)
|
||||||
rw.Header().Add("Content-Type", "application/json")
|
|
||||||
rw.WriteHeader(http.StatusUnauthorized)
|
|
||||||
json.NewEncoder(rw).Encode(map[string]string{
|
|
||||||
"status": http.StatusText(http.StatusUnauthorized),
|
|
||||||
"error": err.Error(),
|
|
||||||
})
|
|
||||||
})
|
|
||||||
})
|
})
|
||||||
|
|
||||||
frontendapi.Use(func(next http.Handler) http.Handler {
|
frontendapi.Use(func(next http.Handler) http.Handler {
|
||||||
return authHandle.AuthFrontendApi(
|
return authHandle.AuthFrontendApi(
|
||||||
// On success;
|
// On success;
|
||||||
next,
|
next,
|
||||||
|
|
||||||
// On failure: JSON Response
|
// On failure: JSON Response
|
||||||
func(rw http.ResponseWriter, r *http.Request, err error) {
|
onFailureResponse)
|
||||||
rw.Header().Add("Content-Type", "application/json")
|
|
||||||
rw.WriteHeader(http.StatusUnauthorized)
|
|
||||||
json.NewEncoder(rw).Encode(map[string]string{
|
|
||||||
"status": http.StatusText(http.StatusUnauthorized),
|
|
||||||
"error": err.Error(),
|
|
||||||
})
|
|
||||||
})
|
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -283,8 +261,8 @@ func serverStart() {
|
|||||||
})
|
})
|
||||||
|
|
||||||
server = &http.Server{
|
server = &http.Server{
|
||||||
ReadTimeout: 10 * time.Second,
|
ReadTimeout: 20 * time.Second,
|
||||||
WriteTimeout: 10 * time.Second,
|
WriteTimeout: 20 * time.Second,
|
||||||
Handler: handler,
|
Handler: handler,
|
||||||
Addr: config.Keys.Addr,
|
Addr: config.Keys.Addr,
|
||||||
}
|
}
|
||||||
@ -331,5 +309,5 @@ func serverShutdown() {
|
|||||||
server.Shutdown(context.Background())
|
server.Shutdown(context.Background())
|
||||||
|
|
||||||
// Then, wait for any async archivings still pending...
|
// Then, wait for any async archivings still pending...
|
||||||
apiHandle.JobRepository.WaitForArchiving()
|
archiver.WaitForArchiving()
|
||||||
}
|
}
|
||||||
|
@ -19,9 +19,11 @@ import (
|
|||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/ClusterCockpit/cc-backend/internal/api"
|
"github.com/ClusterCockpit/cc-backend/internal/api"
|
||||||
|
"github.com/ClusterCockpit/cc-backend/internal/archiver"
|
||||||
"github.com/ClusterCockpit/cc-backend/internal/auth"
|
"github.com/ClusterCockpit/cc-backend/internal/auth"
|
||||||
"github.com/ClusterCockpit/cc-backend/internal/config"
|
"github.com/ClusterCockpit/cc-backend/internal/config"
|
||||||
"github.com/ClusterCockpit/cc-backend/internal/graph"
|
"github.com/ClusterCockpit/cc-backend/internal/graph"
|
||||||
|
"github.com/ClusterCockpit/cc-backend/internal/metricDataDispatcher"
|
||||||
"github.com/ClusterCockpit/cc-backend/internal/metricdata"
|
"github.com/ClusterCockpit/cc-backend/internal/metricdata"
|
||||||
"github.com/ClusterCockpit/cc-backend/internal/repository"
|
"github.com/ClusterCockpit/cc-backend/internal/repository"
|
||||||
"github.com/ClusterCockpit/cc-backend/pkg/archive"
|
"github.com/ClusterCockpit/cc-backend/pkg/archive"
|
||||||
@ -150,10 +152,11 @@ func setup(t *testing.T) *api.RestApi {
|
|||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := metricdata.Init(config.Keys.DisableArchive); err != nil {
|
if err := metricdata.Init(); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
archiver.Start(repository.GetJobRepository())
|
||||||
auth.Init()
|
auth.Init()
|
||||||
graph.Init()
|
graph.Init()
|
||||||
|
|
||||||
@ -311,7 +314,7 @@ func TestRestApi(t *testing.T) {
|
|||||||
t.Fatal(response.Status, recorder.Body.String())
|
t.Fatal(response.Status, recorder.Body.String())
|
||||||
}
|
}
|
||||||
|
|
||||||
restapi.JobRepository.WaitForArchiving()
|
archiver.WaitForArchiving()
|
||||||
resolver := graph.GetResolverInstance()
|
resolver := graph.GetResolverInstance()
|
||||||
job, err := resolver.Query().Job(ctx, strconv.Itoa(int(dbid)))
|
job, err := resolver.Query().Job(ctx, strconv.Itoa(int(dbid)))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -341,7 +344,7 @@ func TestRestApi(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
t.Run("CheckArchive", func(t *testing.T) {
|
t.Run("CheckArchive", func(t *testing.T) {
|
||||||
data, err := metricdata.LoadData(stoppedJob, []string{"load_one"}, []schema.MetricScope{schema.MetricScopeNode}, context.Background())
|
data, err := metricDataDispatcher.LoadData(stoppedJob, []string{"load_one"}, []schema.MetricScope{schema.MetricScopeNode}, context.Background())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -422,7 +425,7 @@ func TestRestApi(t *testing.T) {
|
|||||||
t.Fatal(response.Status, recorder.Body.String())
|
t.Fatal(response.Status, recorder.Body.String())
|
||||||
}
|
}
|
||||||
|
|
||||||
restapi.JobRepository.WaitForArchiving()
|
archiver.WaitForArchiving()
|
||||||
jobid, cluster := int64(12345), "testcluster"
|
jobid, cluster := int64(12345), "testcluster"
|
||||||
job, err := restapi.JobRepository.Find(&jobid, &cluster, nil)
|
job, err := restapi.JobRepository.Find(&jobid, &cluster, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -19,12 +19,13 @@ import (
|
|||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/ClusterCockpit/cc-backend/internal/archiver"
|
||||||
"github.com/ClusterCockpit/cc-backend/internal/auth"
|
"github.com/ClusterCockpit/cc-backend/internal/auth"
|
||||||
"github.com/ClusterCockpit/cc-backend/internal/config"
|
"github.com/ClusterCockpit/cc-backend/internal/config"
|
||||||
"github.com/ClusterCockpit/cc-backend/internal/graph"
|
"github.com/ClusterCockpit/cc-backend/internal/graph"
|
||||||
"github.com/ClusterCockpit/cc-backend/internal/graph/model"
|
"github.com/ClusterCockpit/cc-backend/internal/graph/model"
|
||||||
"github.com/ClusterCockpit/cc-backend/internal/importer"
|
"github.com/ClusterCockpit/cc-backend/internal/importer"
|
||||||
"github.com/ClusterCockpit/cc-backend/internal/metricdata"
|
"github.com/ClusterCockpit/cc-backend/internal/metricDataDispatcher"
|
||||||
"github.com/ClusterCockpit/cc-backend/internal/repository"
|
"github.com/ClusterCockpit/cc-backend/internal/repository"
|
||||||
"github.com/ClusterCockpit/cc-backend/internal/util"
|
"github.com/ClusterCockpit/cc-backend/internal/util"
|
||||||
"github.com/ClusterCockpit/cc-backend/pkg/archive"
|
"github.com/ClusterCockpit/cc-backend/pkg/archive"
|
||||||
@ -515,7 +516,7 @@ func (api *RestApi) getCompleteJobById(rw http.ResponseWriter, r *http.Request)
|
|||||||
var data schema.JobData
|
var data schema.JobData
|
||||||
|
|
||||||
if r.URL.Query().Get("all-metrics") == "true" {
|
if r.URL.Query().Get("all-metrics") == "true" {
|
||||||
data, err = metricdata.LoadData(job, nil, scopes, r.Context())
|
data, err = metricDataDispatcher.LoadData(job, nil, scopes, r.Context())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Warn("Error while loading job data")
|
log.Warn("Error while loading job data")
|
||||||
return
|
return
|
||||||
@ -604,7 +605,7 @@ func (api *RestApi) getJobById(rw http.ResponseWriter, r *http.Request) {
|
|||||||
scopes = []schema.MetricScope{"node"}
|
scopes = []schema.MetricScope{"node"}
|
||||||
}
|
}
|
||||||
|
|
||||||
data, err := metricdata.LoadData(job, metrics, scopes, r.Context())
|
data, err := metricDataDispatcher.LoadData(job, metrics, scopes, r.Context())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Warn("Error while loading job data")
|
log.Warn("Error while loading job data")
|
||||||
return
|
return
|
||||||
@ -1081,7 +1082,7 @@ func (api *RestApi) checkAndHandleStopJob(rw http.ResponseWriter, job *schema.Jo
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Trigger async archiving
|
// Trigger async archiving
|
||||||
api.JobRepository.TriggerArchiving(job)
|
archiver.TriggerArchiving(job)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (api *RestApi) getJobMetrics(rw http.ResponseWriter, r *http.Request) {
|
func (api *RestApi) getJobMetrics(rw http.ResponseWriter, r *http.Request) {
|
||||||
|
94
internal/archiver/archiveWorker.go
Normal file
94
internal/archiver/archiveWorker.go
Normal file
@ -0,0 +1,94 @@
|
|||||||
|
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
|
||||||
|
// All rights reserved.
|
||||||
|
// Use of this source code is governed by a MIT-style
|
||||||
|
// license that can be found in the LICENSE file.
|
||||||
|
package archiver
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/ClusterCockpit/cc-backend/internal/repository"
|
||||||
|
"github.com/ClusterCockpit/cc-backend/pkg/log"
|
||||||
|
"github.com/ClusterCockpit/cc-backend/pkg/schema"
|
||||||
|
sq "github.com/Masterminds/squirrel"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
archivePending sync.WaitGroup
|
||||||
|
archiveChannel chan *schema.Job
|
||||||
|
jobRepo *repository.JobRepository
|
||||||
|
)
|
||||||
|
|
||||||
|
func Start(r *repository.JobRepository) {
|
||||||
|
archiveChannel = make(chan *schema.Job, 128)
|
||||||
|
jobRepo = r
|
||||||
|
|
||||||
|
go archivingWorker()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Archiving worker thread
|
||||||
|
func archivingWorker() {
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case job, ok := <-archiveChannel:
|
||||||
|
if !ok {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
start := time.Now()
|
||||||
|
// not using meta data, called to load JobMeta into Cache?
|
||||||
|
// will fail if job meta not in repository
|
||||||
|
if _, err := jobRepo.FetchMetadata(job); err != nil {
|
||||||
|
log.Errorf("archiving job (dbid: %d) failed at check metadata step: %s", job.ID, err.Error())
|
||||||
|
jobRepo.UpdateMonitoringStatus(job.ID, schema.MonitoringStatusArchivingFailed)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// ArchiveJob will fetch all the data from a MetricDataRepository and push into configured archive backend
|
||||||
|
// TODO: Maybe use context with cancel/timeout here
|
||||||
|
jobMeta, err := ArchiveJob(job, context.Background())
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("archiving job (dbid: %d) failed at archiving job step: %s", job.ID, err.Error())
|
||||||
|
jobRepo.UpdateMonitoringStatus(job.ID, schema.MonitoringStatusArchivingFailed)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
stmt := sq.Update("job").Where("job.id = ?", job.ID)
|
||||||
|
|
||||||
|
if stmt, err = jobRepo.UpdateFootprint(stmt, jobMeta); err != nil {
|
||||||
|
log.Errorf("archiving job (dbid: %d) failed at update Footprint step: %s", job.ID, err.Error())
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if stmt, err = jobRepo.UpdateEnergy(stmt, jobMeta); err != nil {
|
||||||
|
log.Errorf("archiving job (dbid: %d) failed at update Energy step: %s", job.ID, err.Error())
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
// Update the jobs database entry one last time:
|
||||||
|
stmt = jobRepo.MarkArchived(stmt, schema.MonitoringStatusArchivingSuccessful)
|
||||||
|
if err := jobRepo.Execute(stmt); err != nil {
|
||||||
|
log.Errorf("archiving job (dbid: %d) failed at db execute: %s", job.ID, err.Error())
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
log.Debugf("archiving job %d took %s", job.JobID, time.Since(start))
|
||||||
|
log.Printf("archiving job (dbid: %d) successful", job.ID)
|
||||||
|
archivePending.Done()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Trigger async archiving
|
||||||
|
func TriggerArchiving(job *schema.Job) {
|
||||||
|
if archiveChannel == nil {
|
||||||
|
log.Fatal("Cannot archive without archiving channel. Did you Start the archiver?")
|
||||||
|
}
|
||||||
|
|
||||||
|
archivePending.Add(1)
|
||||||
|
archiveChannel <- job
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wait for background thread to finish pending archiving operations
|
||||||
|
func WaitForArchiving() {
|
||||||
|
// close channel and wait for worker to process remaining jobs
|
||||||
|
archivePending.Wait()
|
||||||
|
}
|
82
internal/archiver/archiver.go
Normal file
82
internal/archiver/archiver.go
Normal file
@ -0,0 +1,82 @@
|
|||||||
|
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
|
||||||
|
// All rights reserved.
|
||||||
|
// Use of this source code is governed by a MIT-style
|
||||||
|
// license that can be found in the LICENSE file.
|
||||||
|
package archiver
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"math"
|
||||||
|
|
||||||
|
"github.com/ClusterCockpit/cc-backend/internal/config"
|
||||||
|
"github.com/ClusterCockpit/cc-backend/internal/metricDataDispatcher"
|
||||||
|
"github.com/ClusterCockpit/cc-backend/pkg/archive"
|
||||||
|
"github.com/ClusterCockpit/cc-backend/pkg/log"
|
||||||
|
"github.com/ClusterCockpit/cc-backend/pkg/schema"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Writes a running job to the job-archive
|
||||||
|
func ArchiveJob(job *schema.Job, ctx context.Context) (*schema.JobMeta, error) {
|
||||||
|
allMetrics := make([]string, 0)
|
||||||
|
metricConfigs := archive.GetCluster(job.Cluster).MetricConfig
|
||||||
|
for _, mc := range metricConfigs {
|
||||||
|
allMetrics = append(allMetrics, mc.Name)
|
||||||
|
}
|
||||||
|
|
||||||
|
scopes := []schema.MetricScope{schema.MetricScopeNode}
|
||||||
|
// FIXME: Add a config option for this
|
||||||
|
if job.NumNodes <= 8 {
|
||||||
|
// This will add the native scope if core scope is not available
|
||||||
|
scopes = append(scopes, schema.MetricScopeCore)
|
||||||
|
}
|
||||||
|
|
||||||
|
if job.NumAcc > 0 {
|
||||||
|
scopes = append(scopes, schema.MetricScopeAccelerator)
|
||||||
|
}
|
||||||
|
|
||||||
|
jobData, err := metricDataDispatcher.LoadData(job, allMetrics, scopes, ctx)
|
||||||
|
if err != nil {
|
||||||
|
log.Error("Error wile loading job data for archiving")
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
jobMeta := &schema.JobMeta{
|
||||||
|
BaseJob: job.BaseJob,
|
||||||
|
StartTime: job.StartTime.Unix(),
|
||||||
|
Statistics: make(map[string]schema.JobStatistics),
|
||||||
|
}
|
||||||
|
|
||||||
|
for metric, data := range jobData {
|
||||||
|
avg, min, max := 0.0, math.MaxFloat32, -math.MaxFloat32
|
||||||
|
nodeData, ok := data["node"]
|
||||||
|
if !ok {
|
||||||
|
// This should never happen ?
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, series := range nodeData.Series {
|
||||||
|
avg += series.Statistics.Avg
|
||||||
|
min = math.Min(min, series.Statistics.Min)
|
||||||
|
max = math.Max(max, series.Statistics.Max)
|
||||||
|
}
|
||||||
|
|
||||||
|
jobMeta.Statistics[metric] = schema.JobStatistics{
|
||||||
|
Unit: schema.Unit{
|
||||||
|
Prefix: archive.GetMetricConfig(job.Cluster, metric).Unit.Prefix,
|
||||||
|
Base: archive.GetMetricConfig(job.Cluster, metric).Unit.Base,
|
||||||
|
},
|
||||||
|
Avg: avg / float64(job.NumNodes),
|
||||||
|
Min: min,
|
||||||
|
Max: max,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// If the file based archive is disabled,
|
||||||
|
// only return the JobMeta structure as the
|
||||||
|
// statistics in there are needed.
|
||||||
|
if config.Keys.DisableArchive {
|
||||||
|
return jobMeta, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return jobMeta, archive.GetHandle().ImportJob(jobMeta, &jobData)
|
||||||
|
}
|
@ -15,7 +15,7 @@ import (
|
|||||||
"github.com/ClusterCockpit/cc-backend/internal/config"
|
"github.com/ClusterCockpit/cc-backend/internal/config"
|
||||||
"github.com/ClusterCockpit/cc-backend/internal/graph/generated"
|
"github.com/ClusterCockpit/cc-backend/internal/graph/generated"
|
||||||
"github.com/ClusterCockpit/cc-backend/internal/graph/model"
|
"github.com/ClusterCockpit/cc-backend/internal/graph/model"
|
||||||
"github.com/ClusterCockpit/cc-backend/internal/metricdata"
|
"github.com/ClusterCockpit/cc-backend/internal/metricDataDispatcher"
|
||||||
"github.com/ClusterCockpit/cc-backend/internal/repository"
|
"github.com/ClusterCockpit/cc-backend/internal/repository"
|
||||||
"github.com/ClusterCockpit/cc-backend/pkg/archive"
|
"github.com/ClusterCockpit/cc-backend/pkg/archive"
|
||||||
"github.com/ClusterCockpit/cc-backend/pkg/log"
|
"github.com/ClusterCockpit/cc-backend/pkg/log"
|
||||||
@ -231,7 +231,7 @@ func (r *queryResolver) JobMetrics(ctx context.Context, id string, metrics []str
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
data, err := metricdata.LoadData(job, metrics, scopes, ctx)
|
data, err := metricDataDispatcher.LoadData(job, metrics, scopes, ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Warn("Error while loading job data")
|
log.Warn("Error while loading job data")
|
||||||
return nil, err
|
return nil, err
|
||||||
@ -383,7 +383,7 @@ func (r *queryResolver) NodeMetrics(ctx context.Context, cluster string, nodes [
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
data, err := metricdata.LoadNodeData(cluster, metrics, nodes, scopes, from, to, ctx)
|
data, err := metricDataDispatcher.LoadNodeData(cluster, metrics, nodes, scopes, from, to, ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Warn("Error while loading node data")
|
log.Warn("Error while loading node data")
|
||||||
return nil, err
|
return nil, err
|
||||||
@ -440,9 +440,11 @@ func (r *Resolver) Query() generated.QueryResolver { return &queryResolver{r} }
|
|||||||
// SubCluster returns generated.SubClusterResolver implementation.
|
// SubCluster returns generated.SubClusterResolver implementation.
|
||||||
func (r *Resolver) SubCluster() generated.SubClusterResolver { return &subClusterResolver{r} }
|
func (r *Resolver) SubCluster() generated.SubClusterResolver { return &subClusterResolver{r} }
|
||||||
|
|
||||||
type clusterResolver struct{ *Resolver }
|
type (
|
||||||
type jobResolver struct{ *Resolver }
|
clusterResolver struct{ *Resolver }
|
||||||
type metricValueResolver struct{ *Resolver }
|
jobResolver struct{ *Resolver }
|
||||||
type mutationResolver struct{ *Resolver }
|
metricValueResolver struct{ *Resolver }
|
||||||
type queryResolver struct{ *Resolver }
|
mutationResolver struct{ *Resolver }
|
||||||
type subClusterResolver struct{ *Resolver }
|
queryResolver struct{ *Resolver }
|
||||||
|
subClusterResolver struct{ *Resolver }
|
||||||
|
)
|
||||||
|
@ -11,7 +11,7 @@ import (
|
|||||||
|
|
||||||
"github.com/99designs/gqlgen/graphql"
|
"github.com/99designs/gqlgen/graphql"
|
||||||
"github.com/ClusterCockpit/cc-backend/internal/graph/model"
|
"github.com/ClusterCockpit/cc-backend/internal/graph/model"
|
||||||
"github.com/ClusterCockpit/cc-backend/internal/metricdata"
|
"github.com/ClusterCockpit/cc-backend/internal/metricDataDispatcher"
|
||||||
"github.com/ClusterCockpit/cc-backend/pkg/log"
|
"github.com/ClusterCockpit/cc-backend/pkg/log"
|
||||||
"github.com/ClusterCockpit/cc-backend/pkg/schema"
|
"github.com/ClusterCockpit/cc-backend/pkg/schema"
|
||||||
// "github.com/ClusterCockpit/cc-backend/pkg/archive"
|
// "github.com/ClusterCockpit/cc-backend/pkg/archive"
|
||||||
@ -24,8 +24,8 @@ func (r *queryResolver) rooflineHeatmap(
|
|||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
filter []*model.JobFilter,
|
filter []*model.JobFilter,
|
||||||
rows int, cols int,
|
rows int, cols int,
|
||||||
minX float64, minY float64, maxX float64, maxY float64) ([][]float64, error) {
|
minX float64, minY float64, maxX float64, maxY float64,
|
||||||
|
) ([][]float64, error) {
|
||||||
jobs, err := r.Repo.QueryJobs(ctx, filter, &model.PageRequest{Page: 1, ItemsPerPage: MAX_JOBS_FOR_ANALYSIS + 1}, nil)
|
jobs, err := r.Repo.QueryJobs(ctx, filter, &model.PageRequest{Page: 1, ItemsPerPage: MAX_JOBS_FOR_ANALYSIS + 1}, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("Error while querying jobs for roofline")
|
log.Error("Error while querying jobs for roofline")
|
||||||
@ -47,7 +47,7 @@ func (r *queryResolver) rooflineHeatmap(
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
jobdata, err := metricdata.LoadData(job, []string{"flops_any", "mem_bw"}, []schema.MetricScope{schema.MetricScopeNode}, ctx)
|
jobdata, err := metricDataDispatcher.LoadData(job, []string{"flops_any", "mem_bw"}, []schema.MetricScope{schema.MetricScopeNode}, ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("Error while loading roofline metrics for job %d", job.ID)
|
log.Errorf("Error while loading roofline metrics for job %d", job.ID)
|
||||||
return nil, err
|
return nil, err
|
||||||
@ -120,7 +120,7 @@ func (r *queryResolver) jobsFootprints(ctx context.Context, filter []*model.JobF
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := metricdata.LoadAverages(job, metrics, avgs, ctx); err != nil {
|
if err := metricDataDispatcher.LoadAverages(job, metrics, avgs, ctx); err != nil {
|
||||||
log.Error("Error while loading averages for footprint")
|
log.Error("Error while loading averages for footprint")
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -77,8 +77,16 @@ func HandleImportFlag(flag string) error {
|
|||||||
job.Footprint = make(map[string]float64)
|
job.Footprint = make(map[string]float64)
|
||||||
|
|
||||||
for _, fp := range sc.Footprint {
|
for _, fp := range sc.Footprint {
|
||||||
job.Footprint[fp] = repository.LoadJobStat(&job, fp)
|
statType := "avg"
|
||||||
|
|
||||||
|
if i, err := archive.MetricIndex(sc.MetricConfig, fp); err != nil {
|
||||||
|
statType = sc.MetricConfig[i].Footprint
|
||||||
}
|
}
|
||||||
|
|
||||||
|
name := fmt.Sprintf("%s_%s", fp, statType)
|
||||||
|
job.Footprint[fp] = repository.LoadJobStat(&job, name, statType)
|
||||||
|
}
|
||||||
|
|
||||||
job.RawFootprint, err = json.Marshal(job.Footprint)
|
job.RawFootprint, err = json.Marshal(job.Footprint)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Warn("Error while marshaling job footprint")
|
log.Warn("Error while marshaling job footprint")
|
||||||
|
@ -16,6 +16,11 @@ import (
|
|||||||
"github.com/ClusterCockpit/cc-backend/pkg/schema"
|
"github.com/ClusterCockpit/cc-backend/pkg/schema"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
addTagQuery = "INSERT INTO tag (tag_name, tag_type) VALUES (?, ?)"
|
||||||
|
setTagQuery = "INSERT INTO jobtag (job_id, tag_id) VALUES (?, ?)"
|
||||||
|
)
|
||||||
|
|
||||||
// Delete the tables "job", "tag" and "jobtag" from the database and
|
// Delete the tables "job", "tag" and "jobtag" from the database and
|
||||||
// repopulate them using the jobs found in `archive`.
|
// repopulate them using the jobs found in `archive`.
|
||||||
func InitDB() error {
|
func InitDB() error {
|
||||||
@ -68,7 +73,15 @@ func InitDB() error {
|
|||||||
job.Footprint = make(map[string]float64)
|
job.Footprint = make(map[string]float64)
|
||||||
|
|
||||||
for _, fp := range sc.Footprint {
|
for _, fp := range sc.Footprint {
|
||||||
job.Footprint[fp] = repository.LoadJobStat(jobMeta, fp)
|
statType := "avg"
|
||||||
|
|
||||||
|
if i, err := archive.MetricIndex(sc.MetricConfig, fp); err != nil {
|
||||||
|
statType = sc.MetricConfig[i].Footprint
|
||||||
|
}
|
||||||
|
|
||||||
|
name := fmt.Sprintf("%s_%s", fp, statType)
|
||||||
|
|
||||||
|
job.Footprint[fp] = repository.LoadJobStat(jobMeta, name, statType)
|
||||||
}
|
}
|
||||||
|
|
||||||
job.RawFootprint, err = json.Marshal(job.Footprint)
|
job.RawFootprint, err = json.Marshal(job.Footprint)
|
||||||
@ -97,7 +110,8 @@ func InitDB() error {
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
id, err := r.TransactionAdd(t, job)
|
id, err := r.TransactionAddNamed(t,
|
||||||
|
repository.NamedJobInsert, job)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("repository initDB(): %v", err)
|
log.Errorf("repository initDB(): %v", err)
|
||||||
errorOccured++
|
errorOccured++
|
||||||
@ -108,7 +122,9 @@ func InitDB() error {
|
|||||||
tagstr := tag.Name + ":" + tag.Type
|
tagstr := tag.Name + ":" + tag.Type
|
||||||
tagId, ok := tags[tagstr]
|
tagId, ok := tags[tagstr]
|
||||||
if !ok {
|
if !ok {
|
||||||
tagId, err = r.TransactionAddTag(t, tag)
|
tagId, err = r.TransactionAdd(t,
|
||||||
|
addTagQuery,
|
||||||
|
tag.Name, tag.Type)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("Error adding tag: %v", err)
|
log.Errorf("Error adding tag: %v", err)
|
||||||
errorOccured++
|
errorOccured++
|
||||||
@ -117,7 +133,9 @@ func InitDB() error {
|
|||||||
tags[tagstr] = tagId
|
tags[tagstr] = tagId
|
||||||
}
|
}
|
||||||
|
|
||||||
r.TransactionSetTag(t, id, tagId)
|
r.TransactionAdd(t,
|
||||||
|
setTagQuery,
|
||||||
|
id, tagId)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err == nil {
|
if err == nil {
|
||||||
|
231
internal/metricDataDispatcher/dataLoader.go
Normal file
231
internal/metricDataDispatcher/dataLoader.go
Normal file
@ -0,0 +1,231 @@
|
|||||||
|
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
|
||||||
|
// All rights reserved.
|
||||||
|
// Use of this source code is governed by a MIT-style
|
||||||
|
// license that can be found in the LICENSE file.
|
||||||
|
package metricDataDispatcher
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/ClusterCockpit/cc-backend/internal/config"
|
||||||
|
"github.com/ClusterCockpit/cc-backend/internal/metricdata"
|
||||||
|
"github.com/ClusterCockpit/cc-backend/pkg/archive"
|
||||||
|
"github.com/ClusterCockpit/cc-backend/pkg/log"
|
||||||
|
"github.com/ClusterCockpit/cc-backend/pkg/lrucache"
|
||||||
|
"github.com/ClusterCockpit/cc-backend/pkg/schema"
|
||||||
|
)
|
||||||
|
|
||||||
|
var cache *lrucache.Cache = lrucache.New(128 * 1024 * 1024)
|
||||||
|
|
||||||
|
func cacheKey(
|
||||||
|
job *schema.Job,
|
||||||
|
metrics []string,
|
||||||
|
scopes []schema.MetricScope,
|
||||||
|
) string {
|
||||||
|
// Duration and StartTime do not need to be in the cache key as StartTime is less unique than
|
||||||
|
// job.ID and the TTL of the cache entry makes sure it does not stay there forever.
|
||||||
|
return fmt.Sprintf("%d(%s):[%v],[%v]",
|
||||||
|
job.ID, job.State, metrics, scopes)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Fetches the metric data for a job.
|
||||||
|
func LoadData(job *schema.Job,
|
||||||
|
metrics []string,
|
||||||
|
scopes []schema.MetricScope,
|
||||||
|
ctx context.Context,
|
||||||
|
) (schema.JobData, error) {
|
||||||
|
data := cache.Get(cacheKey(job, metrics, scopes), func() (_ interface{}, ttl time.Duration, size int) {
|
||||||
|
var jd schema.JobData
|
||||||
|
var err error
|
||||||
|
|
||||||
|
if job.State == schema.JobStateRunning ||
|
||||||
|
job.MonitoringStatus == schema.MonitoringStatusRunningOrArchiving ||
|
||||||
|
config.Keys.DisableArchive {
|
||||||
|
|
||||||
|
repo, err := metricdata.GetMetricDataRepo(job.Cluster)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("METRICDATA/METRICDATA > no metric data repository configured for '%s'", job.Cluster), 0, 0
|
||||||
|
}
|
||||||
|
|
||||||
|
if scopes == nil {
|
||||||
|
scopes = append(scopes, schema.MetricScopeNode)
|
||||||
|
}
|
||||||
|
|
||||||
|
if metrics == nil {
|
||||||
|
cluster := archive.GetCluster(job.Cluster)
|
||||||
|
for _, mc := range cluster.MetricConfig {
|
||||||
|
metrics = append(metrics, mc.Name)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
jd, err = repo.LoadData(job, metrics, scopes, ctx)
|
||||||
|
if err != nil {
|
||||||
|
if len(jd) != 0 {
|
||||||
|
log.Warnf("partial error: %s", err.Error())
|
||||||
|
// return err, 0, 0 // Reactivating will block archiving on one partial error
|
||||||
|
} else {
|
||||||
|
log.Error("Error while loading job data from metric repository")
|
||||||
|
return err, 0, 0
|
||||||
|
}
|
||||||
|
}
|
||||||
|
size = jd.Size()
|
||||||
|
} else {
|
||||||
|
jd, err = archive.GetHandle().LoadJobData(job)
|
||||||
|
if err != nil {
|
||||||
|
log.Error("Error while loading job data from archive")
|
||||||
|
return err, 0, 0
|
||||||
|
}
|
||||||
|
|
||||||
|
// Avoid sending unrequested data to the client:
|
||||||
|
if metrics != nil || scopes != nil {
|
||||||
|
if metrics == nil {
|
||||||
|
metrics = make([]string, 0, len(jd))
|
||||||
|
for k := range jd {
|
||||||
|
metrics = append(metrics, k)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
res := schema.JobData{}
|
||||||
|
for _, metric := range metrics {
|
||||||
|
if perscope, ok := jd[metric]; ok {
|
||||||
|
if len(perscope) > 1 {
|
||||||
|
subset := make(map[schema.MetricScope]*schema.JobMetric)
|
||||||
|
for _, scope := range scopes {
|
||||||
|
if jm, ok := perscope[scope]; ok {
|
||||||
|
subset[scope] = jm
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(subset) > 0 {
|
||||||
|
perscope = subset
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
res[metric] = perscope
|
||||||
|
}
|
||||||
|
}
|
||||||
|
jd = res
|
||||||
|
}
|
||||||
|
size = jd.Size()
|
||||||
|
}
|
||||||
|
|
||||||
|
ttl = 5 * time.Hour
|
||||||
|
if job.State == schema.JobStateRunning {
|
||||||
|
ttl = 2 * time.Minute
|
||||||
|
}
|
||||||
|
|
||||||
|
// FIXME: Review: Is this really necessary or correct.
|
||||||
|
// For /monitoring/job/<job> and some other places, flops_any and mem_bw need
|
||||||
|
// to be available at the scope 'node'. If a job has a lot of nodes,
|
||||||
|
// statisticsSeries should be available so that a min/median/max Graph can be
|
||||||
|
// used instead of a lot of single lines.
|
||||||
|
const maxSeriesSize int = 15
|
||||||
|
for _, scopes := range jd {
|
||||||
|
for _, jm := range scopes {
|
||||||
|
if jm.StatisticsSeries != nil || len(jm.Series) <= maxSeriesSize {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
jm.AddStatisticsSeries()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
nodeScopeRequested := false
|
||||||
|
for _, scope := range scopes {
|
||||||
|
if scope == schema.MetricScopeNode {
|
||||||
|
nodeScopeRequested = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if nodeScopeRequested {
|
||||||
|
jd.AddNodeScope("flops_any")
|
||||||
|
jd.AddNodeScope("mem_bw")
|
||||||
|
}
|
||||||
|
|
||||||
|
return jd, ttl, size
|
||||||
|
})
|
||||||
|
|
||||||
|
if err, ok := data.(error); ok {
|
||||||
|
log.Error("Error in returned dataset")
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return data.(schema.JobData), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Used for the jobsFootprint GraphQL-Query. TODO: Rename/Generalize.
|
||||||
|
func LoadAverages(
|
||||||
|
job *schema.Job,
|
||||||
|
metrics []string,
|
||||||
|
data [][]schema.Float,
|
||||||
|
ctx context.Context,
|
||||||
|
) error {
|
||||||
|
if job.State != schema.JobStateRunning && !config.Keys.DisableArchive {
|
||||||
|
return archive.LoadAveragesFromArchive(job, metrics, data) // #166 change also here?
|
||||||
|
}
|
||||||
|
|
||||||
|
repo, err := metricdata.GetMetricDataRepo(job.Cluster)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("METRICDATA/METRICDATA > no metric data repository configured for '%s'", job.Cluster)
|
||||||
|
}
|
||||||
|
|
||||||
|
stats, err := repo.LoadStats(job, metrics, ctx) // #166 how to handle stats for acc normalizazion?
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("Error while loading statistics for job %v (User %v, Project %v)", job.JobID, job.User, job.Project)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
for i, m := range metrics {
|
||||||
|
nodes, ok := stats[m]
|
||||||
|
if !ok {
|
||||||
|
data[i] = append(data[i], schema.NaN)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
sum := 0.0
|
||||||
|
for _, node := range nodes {
|
||||||
|
sum += node.Avg
|
||||||
|
}
|
||||||
|
data[i] = append(data[i], schema.Float(sum))
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Used for the node/system view. Returns a map of nodes to a map of metrics.
|
||||||
|
func LoadNodeData(
|
||||||
|
cluster string,
|
||||||
|
metrics, nodes []string,
|
||||||
|
scopes []schema.MetricScope,
|
||||||
|
from, to time.Time,
|
||||||
|
ctx context.Context,
|
||||||
|
) (map[string]map[string][]*schema.JobMetric, error) {
|
||||||
|
repo, err := metricdata.GetMetricDataRepo(cluster)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("METRICDATA/METRICDATA > no metric data repository configured for '%s'", cluster)
|
||||||
|
}
|
||||||
|
|
||||||
|
if metrics == nil {
|
||||||
|
for _, m := range archive.GetCluster(cluster).MetricConfig {
|
||||||
|
metrics = append(metrics, m.Name)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
data, err := repo.LoadNodeData(cluster, metrics, nodes, scopes, from, to, ctx)
|
||||||
|
if err != nil {
|
||||||
|
if len(data) != 0 {
|
||||||
|
log.Warnf("partial error: %s", err.Error())
|
||||||
|
} else {
|
||||||
|
log.Error("Error while loading node data from metric repository")
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if data == nil {
|
||||||
|
return nil, fmt.Errorf("METRICDATA/METRICDATA > the metric data repository for '%s' does not support this query", cluster)
|
||||||
|
}
|
||||||
|
|
||||||
|
return data, nil
|
||||||
|
}
|
@ -8,13 +8,10 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"math"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/ClusterCockpit/cc-backend/internal/config"
|
"github.com/ClusterCockpit/cc-backend/internal/config"
|
||||||
"github.com/ClusterCockpit/cc-backend/pkg/archive"
|
|
||||||
"github.com/ClusterCockpit/cc-backend/pkg/log"
|
"github.com/ClusterCockpit/cc-backend/pkg/log"
|
||||||
"github.com/ClusterCockpit/cc-backend/pkg/lrucache"
|
|
||||||
"github.com/ClusterCockpit/cc-backend/pkg/schema"
|
"github.com/ClusterCockpit/cc-backend/pkg/schema"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -35,10 +32,7 @@ type MetricDataRepository interface {
|
|||||||
|
|
||||||
var metricDataRepos map[string]MetricDataRepository = map[string]MetricDataRepository{}
|
var metricDataRepos map[string]MetricDataRepository = map[string]MetricDataRepository{}
|
||||||
|
|
||||||
var useArchive bool
|
func Init() error {
|
||||||
|
|
||||||
func Init(disableArchive bool) error {
|
|
||||||
useArchive = !disableArchive
|
|
||||||
for _, cluster := range config.Keys.Clusters {
|
for _, cluster := range config.Keys.Clusters {
|
||||||
if cluster.MetricDataRepository != nil {
|
if cluster.MetricDataRepository != nil {
|
||||||
var kind struct {
|
var kind struct {
|
||||||
@ -73,283 +67,13 @@ func Init(disableArchive bool) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
var cache *lrucache.Cache = lrucache.New(128 * 1024 * 1024)
|
func GetMetricDataRepo(cluster string) (MetricDataRepository, error) {
|
||||||
|
|
||||||
// Fetches the metric data for a job.
|
|
||||||
func LoadData(job *schema.Job,
|
|
||||||
metrics []string,
|
|
||||||
scopes []schema.MetricScope,
|
|
||||||
ctx context.Context,
|
|
||||||
) (schema.JobData, error) {
|
|
||||||
data := cache.Get(cacheKey(job, metrics, scopes), func() (_ interface{}, ttl time.Duration, size int) {
|
|
||||||
var jd schema.JobData
|
|
||||||
var err error
|
var err error
|
||||||
|
|
||||||
if job.State == schema.JobStateRunning ||
|
|
||||||
job.MonitoringStatus == schema.MonitoringStatusRunningOrArchiving ||
|
|
||||||
!useArchive {
|
|
||||||
|
|
||||||
repo, ok := metricDataRepos[job.Cluster]
|
|
||||||
|
|
||||||
if !ok {
|
|
||||||
return fmt.Errorf("METRICDATA/METRICDATA > no metric data repository configured for '%s'", job.Cluster), 0, 0
|
|
||||||
}
|
|
||||||
|
|
||||||
if scopes == nil {
|
|
||||||
scopes = append(scopes, schema.MetricScopeNode)
|
|
||||||
}
|
|
||||||
|
|
||||||
if metrics == nil {
|
|
||||||
cluster := archive.GetCluster(job.Cluster)
|
|
||||||
for _, mc := range cluster.MetricConfig {
|
|
||||||
metrics = append(metrics, mc.Name)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
jd, err = repo.LoadData(job, metrics, scopes, ctx)
|
|
||||||
if err != nil {
|
|
||||||
if len(jd) != 0 {
|
|
||||||
log.Warnf("partial error: %s", err.Error())
|
|
||||||
// return err, 0, 0 // Reactivating will block archiving on one partial error
|
|
||||||
} else {
|
|
||||||
log.Error("Error while loading job data from metric repository")
|
|
||||||
return err, 0, 0
|
|
||||||
}
|
|
||||||
}
|
|
||||||
size = jd.Size()
|
|
||||||
} else {
|
|
||||||
jd, err = archive.GetHandle().LoadJobData(job)
|
|
||||||
if err != nil {
|
|
||||||
log.Error("Error while loading job data from archive")
|
|
||||||
return err, 0, 0
|
|
||||||
}
|
|
||||||
|
|
||||||
// Avoid sending unrequested data to the client:
|
|
||||||
if metrics != nil || scopes != nil {
|
|
||||||
if metrics == nil {
|
|
||||||
metrics = make([]string, 0, len(jd))
|
|
||||||
for k := range jd {
|
|
||||||
metrics = append(metrics, k)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
res := schema.JobData{}
|
|
||||||
for _, metric := range metrics {
|
|
||||||
if perscope, ok := jd[metric]; ok {
|
|
||||||
if len(perscope) > 1 {
|
|
||||||
subset := make(map[schema.MetricScope]*schema.JobMetric)
|
|
||||||
for _, scope := range scopes {
|
|
||||||
if jm, ok := perscope[scope]; ok {
|
|
||||||
subset[scope] = jm
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(subset) > 0 {
|
|
||||||
perscope = subset
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
res[metric] = perscope
|
|
||||||
}
|
|
||||||
}
|
|
||||||
jd = res
|
|
||||||
}
|
|
||||||
size = jd.Size()
|
|
||||||
}
|
|
||||||
|
|
||||||
ttl = 5 * time.Hour
|
|
||||||
if job.State == schema.JobStateRunning {
|
|
||||||
ttl = 2 * time.Minute
|
|
||||||
}
|
|
||||||
|
|
||||||
prepareJobData(jd, scopes)
|
|
||||||
|
|
||||||
return jd, ttl, size
|
|
||||||
})
|
|
||||||
|
|
||||||
if err, ok := data.(error); ok {
|
|
||||||
log.Error("Error in returned dataset")
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return data.(schema.JobData), nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Used for the jobsFootprint GraphQL-Query. TODO: Rename/Generalize.
|
|
||||||
func LoadAverages(
|
|
||||||
job *schema.Job,
|
|
||||||
metrics []string,
|
|
||||||
data [][]schema.Float,
|
|
||||||
ctx context.Context,
|
|
||||||
) error {
|
|
||||||
if job.State != schema.JobStateRunning && useArchive {
|
|
||||||
return archive.LoadAveragesFromArchive(job, metrics, data) // #166 change also here?
|
|
||||||
}
|
|
||||||
|
|
||||||
repo, ok := metricDataRepos[job.Cluster]
|
|
||||||
if !ok {
|
|
||||||
return fmt.Errorf("METRICDATA/METRICDATA > no metric data repository configured for '%s'", job.Cluster)
|
|
||||||
}
|
|
||||||
|
|
||||||
stats, err := repo.LoadStats(job, metrics, ctx) // #166 how to handle stats for acc normalizazion?
|
|
||||||
if err != nil {
|
|
||||||
log.Errorf("Error while loading statistics for job %v (User %v, Project %v)", job.JobID, job.User, job.Project)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
for i, m := range metrics {
|
|
||||||
nodes, ok := stats[m]
|
|
||||||
if !ok {
|
|
||||||
data[i] = append(data[i], schema.NaN)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
sum := 0.0
|
|
||||||
for _, node := range nodes {
|
|
||||||
sum += node.Avg
|
|
||||||
}
|
|
||||||
data[i] = append(data[i], schema.Float(sum))
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Used for the node/system view. Returns a map of nodes to a map of metrics.
|
|
||||||
func LoadNodeData(
|
|
||||||
cluster string,
|
|
||||||
metrics, nodes []string,
|
|
||||||
scopes []schema.MetricScope,
|
|
||||||
from, to time.Time,
|
|
||||||
ctx context.Context,
|
|
||||||
) (map[string]map[string][]*schema.JobMetric, error) {
|
|
||||||
repo, ok := metricDataRepos[cluster]
|
repo, ok := metricDataRepos[cluster]
|
||||||
|
|
||||||
if !ok {
|
if !ok {
|
||||||
return nil, fmt.Errorf("METRICDATA/METRICDATA > no metric data repository configured for '%s'", cluster)
|
err = fmt.Errorf("METRICDATA/METRICDATA > no metric data repository configured for '%s'", cluster)
|
||||||
}
|
}
|
||||||
|
|
||||||
if metrics == nil {
|
return repo, err
|
||||||
for _, m := range archive.GetCluster(cluster).MetricConfig {
|
|
||||||
metrics = append(metrics, m.Name)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
data, err := repo.LoadNodeData(cluster, metrics, nodes, scopes, from, to, ctx)
|
|
||||||
if err != nil {
|
|
||||||
if len(data) != 0 {
|
|
||||||
log.Warnf("partial error: %s", err.Error())
|
|
||||||
} else {
|
|
||||||
log.Error("Error while loading node data from metric repository")
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if data == nil {
|
|
||||||
return nil, fmt.Errorf("METRICDATA/METRICDATA > the metric data repository for '%s' does not support this query", cluster)
|
|
||||||
}
|
|
||||||
|
|
||||||
return data, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func cacheKey(
|
|
||||||
job *schema.Job,
|
|
||||||
metrics []string,
|
|
||||||
scopes []schema.MetricScope,
|
|
||||||
) string {
|
|
||||||
// Duration and StartTime do not need to be in the cache key as StartTime is less unique than
|
|
||||||
// job.ID and the TTL of the cache entry makes sure it does not stay there forever.
|
|
||||||
return fmt.Sprintf("%d(%s):[%v],[%v]",
|
|
||||||
job.ID, job.State, metrics, scopes)
|
|
||||||
}
|
|
||||||
|
|
||||||
// For /monitoring/job/<job> and some other places, flops_any and mem_bw need
|
|
||||||
// to be available at the scope 'node'. If a job has a lot of nodes,
|
|
||||||
// statisticsSeries should be available so that a min/median/max Graph can be
|
|
||||||
// used instead of a lot of single lines.
|
|
||||||
func prepareJobData(
|
|
||||||
jobData schema.JobData,
|
|
||||||
scopes []schema.MetricScope,
|
|
||||||
) {
|
|
||||||
const maxSeriesSize int = 15
|
|
||||||
for _, scopes := range jobData {
|
|
||||||
for _, jm := range scopes {
|
|
||||||
if jm.StatisticsSeries != nil || len(jm.Series) <= maxSeriesSize {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
jm.AddStatisticsSeries()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
nodeScopeRequested := false
|
|
||||||
for _, scope := range scopes {
|
|
||||||
if scope == schema.MetricScopeNode {
|
|
||||||
nodeScopeRequested = true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if nodeScopeRequested {
|
|
||||||
jobData.AddNodeScope("flops_any")
|
|
||||||
jobData.AddNodeScope("mem_bw")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Writes a running job to the job-archive
|
|
||||||
func ArchiveJob(job *schema.Job, ctx context.Context) (*schema.JobMeta, error) {
|
|
||||||
allMetrics := make([]string, 0)
|
|
||||||
metricConfigs := archive.GetCluster(job.Cluster).MetricConfig
|
|
||||||
for _, mc := range metricConfigs {
|
|
||||||
allMetrics = append(allMetrics, mc.Name)
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO: Talk about this! What resolutions to store data at...
|
|
||||||
scopes := []schema.MetricScope{schema.MetricScopeNode}
|
|
||||||
if job.NumNodes <= 8 {
|
|
||||||
scopes = append(scopes, schema.MetricScopeCore)
|
|
||||||
}
|
|
||||||
|
|
||||||
jobData, err := LoadData(job, allMetrics, scopes, ctx)
|
|
||||||
if err != nil {
|
|
||||||
log.Error("Error wile loading job data for archiving")
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
jobMeta := &schema.JobMeta{
|
|
||||||
BaseJob: job.BaseJob,
|
|
||||||
StartTime: job.StartTime.Unix(),
|
|
||||||
Statistics: make(map[string]schema.JobStatistics),
|
|
||||||
}
|
|
||||||
|
|
||||||
for metric, data := range jobData {
|
|
||||||
avg, min, max := 0.0, math.MaxFloat32, -math.MaxFloat32
|
|
||||||
nodeData, ok := data["node"]
|
|
||||||
if !ok {
|
|
||||||
// TODO/FIXME: Calc average for non-node metrics as well!
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, series := range nodeData.Series {
|
|
||||||
avg += series.Statistics.Avg
|
|
||||||
min = math.Min(min, series.Statistics.Min)
|
|
||||||
max = math.Max(max, series.Statistics.Max)
|
|
||||||
}
|
|
||||||
|
|
||||||
jobMeta.Statistics[metric] = schema.JobStatistics{
|
|
||||||
Unit: schema.Unit{
|
|
||||||
Prefix: archive.GetMetricConfig(job.Cluster, metric).Unit.Prefix,
|
|
||||||
Base: archive.GetMetricConfig(job.Cluster, metric).Unit.Base,
|
|
||||||
},
|
|
||||||
Avg: avg / float64(job.NumNodes),
|
|
||||||
Min: min,
|
|
||||||
Max: max,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// If the file based archive is disabled,
|
|
||||||
// only return the JobMeta structure as the
|
|
||||||
// statistics in there are needed.
|
|
||||||
if !useArchive {
|
|
||||||
return jobMeta, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
return jobMeta, archive.GetHandle().ImportJob(jobMeta, &jobData)
|
|
||||||
}
|
}
|
||||||
|
@ -1,112 +0,0 @@
|
|||||||
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
|
|
||||||
// All rights reserved.
|
|
||||||
// Use of this source code is governed by a MIT-style
|
|
||||||
// license that can be found in the LICENSE file.
|
|
||||||
package repository
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"encoding/json"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/ClusterCockpit/cc-backend/internal/metricdata"
|
|
||||||
"github.com/ClusterCockpit/cc-backend/pkg/archive"
|
|
||||||
"github.com/ClusterCockpit/cc-backend/pkg/log"
|
|
||||||
"github.com/ClusterCockpit/cc-backend/pkg/schema"
|
|
||||||
sq "github.com/Masterminds/squirrel"
|
|
||||||
)
|
|
||||||
|
|
||||||
// Archiving worker thread
|
|
||||||
func (r *JobRepository) archivingWorker() {
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case job, ok := <-r.archiveChannel:
|
|
||||||
if !ok {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
start := time.Now()
|
|
||||||
// not using meta data, called to load JobMeta into Cache?
|
|
||||||
// will fail if job meta not in repository
|
|
||||||
if _, err := r.FetchMetadata(job); err != nil {
|
|
||||||
log.Errorf("archiving job (dbid: %d) failed at check metadata step: %s", job.ID, err.Error())
|
|
||||||
r.UpdateMonitoringStatus(job.ID, schema.MonitoringStatusArchivingFailed)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
// metricdata.ArchiveJob will fetch all the data from a MetricDataRepository and push into configured archive backend
|
|
||||||
// TODO: Maybe use context with cancel/timeout here
|
|
||||||
jobMeta, err := metricdata.ArchiveJob(job, context.Background())
|
|
||||||
if err != nil {
|
|
||||||
log.Errorf("archiving job (dbid: %d) failed at archiving job step: %s", job.ID, err.Error())
|
|
||||||
r.UpdateMonitoringStatus(job.ID, schema.MonitoringStatusArchivingFailed)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
// Update the jobs database entry one last time:
|
|
||||||
if err := r.MarkArchived(jobMeta, schema.MonitoringStatusArchivingSuccessful); err != nil {
|
|
||||||
log.Errorf("archiving job (dbid: %d) failed at marking archived step: %s", job.ID, err.Error())
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
log.Debugf("archiving job %d took %s", job.JobID, time.Since(start))
|
|
||||||
log.Printf("archiving job (dbid: %d) successful", job.ID)
|
|
||||||
r.archivePending.Done()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Stop updates the job with the database id jobId using the provided arguments.
|
|
||||||
func (r *JobRepository) MarkArchived(
|
|
||||||
jobMeta *schema.JobMeta,
|
|
||||||
monitoringStatus int32,
|
|
||||||
) error {
|
|
||||||
stmt := sq.Update("job").
|
|
||||||
Set("monitoring_status", monitoringStatus).
|
|
||||||
Where("job.id = ?", jobMeta.JobID)
|
|
||||||
|
|
||||||
sc, err := archive.GetSubCluster(jobMeta.Cluster, jobMeta.SubCluster)
|
|
||||||
if err != nil {
|
|
||||||
log.Errorf("cannot get subcluster: %s", err.Error())
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
footprint := make(map[string]float64)
|
|
||||||
|
|
||||||
for _, fp := range sc.Footprint {
|
|
||||||
footprint[fp] = LoadJobStat(jobMeta, fp)
|
|
||||||
}
|
|
||||||
|
|
||||||
var rawFootprint []byte
|
|
||||||
|
|
||||||
if rawFootprint, err = json.Marshal(footprint); err != nil {
|
|
||||||
log.Warnf("Error while marshaling footprint for job, DB ID '%v'", jobMeta.ID)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
stmt = stmt.Set("footprint", rawFootprint)
|
|
||||||
|
|
||||||
if _, err := stmt.RunWith(r.stmtCache).Exec(); err != nil {
|
|
||||||
log.Warn("Error while marking job as archived")
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *JobRepository) UpdateMonitoringStatus(job int64, monitoringStatus int32) (err error) {
|
|
||||||
stmt := sq.Update("job").
|
|
||||||
Set("monitoring_status", monitoringStatus).
|
|
||||||
Where("job.id = ?", job)
|
|
||||||
|
|
||||||
_, err = stmt.RunWith(r.stmtCache).Exec()
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Trigger async archiving
|
|
||||||
func (r *JobRepository) TriggerArchiving(job *schema.Job) {
|
|
||||||
r.archivePending.Add(1)
|
|
||||||
r.archiveChannel <- job
|
|
||||||
}
|
|
||||||
|
|
||||||
// Wait for background thread to finish pending archiving operations
|
|
||||||
func (r *JobRepository) WaitForArchiving() {
|
|
||||||
// close channel and wait for worker to process remaining jobs
|
|
||||||
r.archivePending.Wait()
|
|
||||||
}
|
|
@ -31,9 +31,7 @@ type JobRepository struct {
|
|||||||
DB *sqlx.DB
|
DB *sqlx.DB
|
||||||
stmtCache *sq.StmtCache
|
stmtCache *sq.StmtCache
|
||||||
cache *lrucache.Cache
|
cache *lrucache.Cache
|
||||||
archiveChannel chan *schema.Job
|
|
||||||
driver string
|
driver string
|
||||||
archivePending sync.WaitGroup
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func GetJobRepository() *JobRepository {
|
func GetJobRepository() *JobRepository {
|
||||||
@ -46,10 +44,7 @@ func GetJobRepository() *JobRepository {
|
|||||||
|
|
||||||
stmtCache: sq.NewStmtCache(db.DB),
|
stmtCache: sq.NewStmtCache(db.DB),
|
||||||
cache: lrucache.New(1024 * 1024),
|
cache: lrucache.New(1024 * 1024),
|
||||||
archiveChannel: make(chan *schema.Job, 128),
|
|
||||||
}
|
}
|
||||||
// start archiving worker
|
|
||||||
go jobRepoInstance.archivingWorker()
|
|
||||||
})
|
})
|
||||||
return jobRepoInstance
|
return jobRepoInstance
|
||||||
}
|
}
|
||||||
@ -210,7 +205,10 @@ func (r *JobRepository) UpdateMetadata(job *schema.Job, key, val string) (err er
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if _, err = sq.Update("job").Set("meta_data", job.RawMetaData).Where("job.id = ?", job.ID).RunWith(r.stmtCache).Exec(); err != nil {
|
if _, err = sq.Update("job").
|
||||||
|
Set("meta_data", job.RawMetaData).
|
||||||
|
Where("job.id = ?", job.ID).
|
||||||
|
RunWith(r.stmtCache).Exec(); err != nil {
|
||||||
log.Warnf("Error while updating metadata for job, DB ID '%v'", job.ID)
|
log.Warnf("Error while updating metadata for job, DB ID '%v'", job.ID)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -458,6 +456,46 @@ func (r *JobRepository) StopJobsExceedingWalltimeBy(seconds int) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (r *JobRepository) FindRunningJobs(cluster string) ([]*schema.Job, error) {
|
||||||
|
query := sq.Select(jobColumns...).From("job").
|
||||||
|
Where(fmt.Sprintf("job.cluster = '%s'", cluster)).
|
||||||
|
Where("job.job_state = 'running'").
|
||||||
|
Where("job.duration>600")
|
||||||
|
|
||||||
|
rows, err := query.RunWith(r.stmtCache).Query()
|
||||||
|
if err != nil {
|
||||||
|
log.Error("Error while running query")
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
jobs := make([]*schema.Job, 0, 50)
|
||||||
|
for rows.Next() {
|
||||||
|
job, err := scanJob(rows)
|
||||||
|
if err != nil {
|
||||||
|
rows.Close()
|
||||||
|
log.Warn("Error while scanning rows")
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
jobs = append(jobs, job)
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Infof("Return job count %d", len(jobs))
|
||||||
|
return jobs, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *JobRepository) UpdateDuration() error {
|
||||||
|
stmnt := sq.Update("job").
|
||||||
|
Set("duration", sq.Expr("? - job.start_time", time.Now().Unix())).
|
||||||
|
Where("job_state = 'running'")
|
||||||
|
|
||||||
|
_, err := stmnt.RunWith(r.stmtCache).Exec()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (r *JobRepository) FindJobsBetween(startTimeBegin int64, startTimeEnd int64) ([]*schema.Job, error) {
|
func (r *JobRepository) FindJobsBetween(startTimeBegin int64, startTimeEnd int64) ([]*schema.Job, error) {
|
||||||
var query sq.SelectBuilder
|
var query sq.SelectBuilder
|
||||||
|
|
||||||
@ -495,3 +533,100 @@ func (r *JobRepository) FindJobsBetween(startTimeBegin int64, startTimeEnd int64
|
|||||||
log.Infof("Return job count %d", len(jobs))
|
log.Infof("Return job count %d", len(jobs))
|
||||||
return jobs, nil
|
return jobs, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (r *JobRepository) UpdateMonitoringStatus(job int64, monitoringStatus int32) (err error) {
|
||||||
|
stmt := sq.Update("job").
|
||||||
|
Set("monitoring_status", monitoringStatus).
|
||||||
|
Where("job.id = ?", job)
|
||||||
|
|
||||||
|
_, err = stmt.RunWith(r.stmtCache).Exec()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *JobRepository) Execute(stmt sq.UpdateBuilder) error {
|
||||||
|
if _, err := stmt.RunWith(r.stmtCache).Exec(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *JobRepository) MarkArchived(
|
||||||
|
stmt sq.UpdateBuilder,
|
||||||
|
monitoringStatus int32,
|
||||||
|
) sq.UpdateBuilder {
|
||||||
|
return stmt.Set("monitoring_status", monitoringStatus)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *JobRepository) UpdateEnergy(
|
||||||
|
stmt sq.UpdateBuilder,
|
||||||
|
jobMeta *schema.JobMeta,
|
||||||
|
) (sq.UpdateBuilder, error) {
|
||||||
|
sc, err := archive.GetSubCluster(jobMeta.Cluster, jobMeta.SubCluster)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("cannot get subcluster: %s", err.Error())
|
||||||
|
return stmt, err
|
||||||
|
}
|
||||||
|
energyFootprint := make(map[string]float64)
|
||||||
|
var totalEnergy float64
|
||||||
|
var energy float64
|
||||||
|
|
||||||
|
for _, fp := range sc.EnergyFootprint {
|
||||||
|
if i, err := archive.MetricIndex(sc.MetricConfig, fp); err != nil {
|
||||||
|
// FIXME: Check for unit conversions
|
||||||
|
if sc.MetricConfig[i].Energy == "power" {
|
||||||
|
energy = LoadJobStat(jobMeta, fp, "avg") * float64(jobMeta.Duration)
|
||||||
|
} else if sc.MetricConfig[i].Energy == "energy" {
|
||||||
|
// This assumes the metric is of aggregation type sum
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
energyFootprint[fp] = energy
|
||||||
|
totalEnergy += energy
|
||||||
|
}
|
||||||
|
|
||||||
|
var rawFootprint []byte
|
||||||
|
|
||||||
|
if rawFootprint, err = json.Marshal(energyFootprint); err != nil {
|
||||||
|
log.Warnf("Error while marshaling energy footprint for job, DB ID '%v'", jobMeta.ID)
|
||||||
|
return stmt, err
|
||||||
|
}
|
||||||
|
|
||||||
|
stmt.Set("energy_footprint", rawFootprint).
|
||||||
|
Set("energy", totalEnergy)
|
||||||
|
|
||||||
|
return stmt, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *JobRepository) UpdateFootprint(
|
||||||
|
stmt sq.UpdateBuilder,
|
||||||
|
jobMeta *schema.JobMeta,
|
||||||
|
) (sq.UpdateBuilder, error) {
|
||||||
|
sc, err := archive.GetSubCluster(jobMeta.Cluster, jobMeta.SubCluster)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("cannot get subcluster: %s", err.Error())
|
||||||
|
return stmt, err
|
||||||
|
}
|
||||||
|
footprint := make(map[string]float64)
|
||||||
|
|
||||||
|
for _, fp := range sc.Footprint {
|
||||||
|
statType := "avg"
|
||||||
|
|
||||||
|
if i, err := archive.MetricIndex(sc.MetricConfig, fp); err != nil {
|
||||||
|
statType = sc.MetricConfig[i].Footprint
|
||||||
|
}
|
||||||
|
|
||||||
|
name := fmt.Sprintf("%s_%s", fp, statType)
|
||||||
|
footprint[fp] = LoadJobStat(jobMeta, name, statType)
|
||||||
|
}
|
||||||
|
|
||||||
|
var rawFootprint []byte
|
||||||
|
|
||||||
|
if rawFootprint, err = json.Marshal(footprint); err != nil {
|
||||||
|
log.Warnf("Error while marshaling footprint for job, DB ID '%v'", jobMeta.ID)
|
||||||
|
return stmt, err
|
||||||
|
}
|
||||||
|
|
||||||
|
stmt.Set("footprint", rawFootprint)
|
||||||
|
return stmt, nil
|
||||||
|
}
|
||||||
|
@ -0,0 +1,21 @@
|
|||||||
|
ALTER TABLE job DROP energy;
|
||||||
|
ALTER TABLE job DROP energy_footprint;
|
||||||
|
ALTER TABLE job ADD COLUMN flops_any_avg;
|
||||||
|
ALTER TABLE job ADD COLUMN mem_bw_avg;
|
||||||
|
ALTER TABLE job ADD COLUMN mem_used_max;
|
||||||
|
ALTER TABLE job ADD COLUMN load_avg;
|
||||||
|
ALTER TABLE job ADD COLUMN net_bw_avg;
|
||||||
|
ALTER TABLE job ADD COLUMN net_data_vol_total;
|
||||||
|
ALTER TABLE job ADD COLUMN file_bw_avg;
|
||||||
|
ALTER TABLE job ADD COLUMN file_data_vol_total;
|
||||||
|
|
||||||
|
UPDATE job SET flops_any_avg = json_extract(footprint, '$.flops_any_avg');
|
||||||
|
UPDATE job SET mem_bw_avg = json_extract(footprint, '$.mem_bw_avg');
|
||||||
|
UPDATE job SET mem_used_max = json_extract(footprint, '$.mem_used_max');
|
||||||
|
UPDATE job SET load_avg = json_extract(footprint, '$.cpu_load_avg');
|
||||||
|
UPDATE job SET net_bw_avg = json_extract(footprint, '$.net_bw_avg');
|
||||||
|
UPDATE job SET net_data_vol_total = json_extract(footprint, '$.net_data_vol_total');
|
||||||
|
UPDATE job SET file_bw_avg = json_extract(footprint, '$.file_bw_avg');
|
||||||
|
UPDATE job SET file_data_vol_total = json_extract(footprint, '$.file_data_vol_total');
|
||||||
|
|
||||||
|
ALTER TABLE job DROP footprint;
|
@ -1,12 +1,74 @@
|
|||||||
|
DROP INDEX job_stats;
|
||||||
|
DROP INDEX job_by_user;
|
||||||
|
DROP INDEX job_by_starttime;
|
||||||
|
DROP INDEX job_by_job_id;
|
||||||
|
DROP INDEX job_list;
|
||||||
|
DROP INDEX job_list_user;
|
||||||
|
DROP INDEX job_list_users;
|
||||||
|
DROP INDEX job_list_users_start;
|
||||||
|
|
||||||
ALTER TABLE job ADD COLUMN energy REAL NOT NULL DEFAULT 0.0;
|
ALTER TABLE job ADD COLUMN energy REAL NOT NULL DEFAULT 0.0;
|
||||||
|
ALTER TABLE job ADD COLUMN energy_footprint TEXT DEFAULT NULL;
|
||||||
|
|
||||||
ALTER TABLE job ADD COLUMN footprint TEXT DEFAULT NULL;
|
ALTER TABLE job ADD COLUMN footprint TEXT DEFAULT NULL;
|
||||||
UPDATE job SET footprint = '{"flops_any_avg": 0.0}';
|
UPDATE job SET footprint = '{"flops_any_avg": 0.0}';
|
||||||
|
|
||||||
UPDATE job SET footprint = json_replace(footprint, '$.flops_any_avg', job.flops_any_avg);
|
UPDATE job SET footprint = json_replace(footprint, '$.flops_any_avg', job.flops_any_avg);
|
||||||
UPDATE job SET footprint = json_insert(footprint, '$.mem_bw_avg', job.mem_bw_avg);
|
UPDATE job SET footprint = json_insert(footprint, '$.mem_bw_avg', job.mem_bw_avg);
|
||||||
UPDATE job SET footprint = json_insert(footprint, '$.mem_used_max', job.mem_used_max);
|
UPDATE job SET footprint = json_insert(footprint, '$.mem_used_max', job.mem_used_max);
|
||||||
UPDATE job SET footprint = json_insert(footprint, '$.cpu_load_avg', job.load_avg);
|
UPDATE job SET footprint = json_insert(footprint, '$.cpu_load_avg', job.load_avg);
|
||||||
|
UPDATE job SET footprint = json_insert(footprint, '$.net_bw_avg', job.net_bw_avg) WHERE job.net_bw_avg != 0;
|
||||||
|
UPDATE job SET footprint = json_insert(footprint, '$.net_data_vol_total', job.net_data_vol_total) WHERE job.net_data_vol_total != 0;
|
||||||
|
UPDATE job SET footprint = json_insert(footprint, '$.file_bw_avg', job.file_bw_avg) WHERE job.file_bw_avg != 0;
|
||||||
|
UPDATE job SET footprint = json_insert(footprint, '$.file_data_vol_total', job.file_data_vol_total) WHERE job.file_data_vol_total != 0;
|
||||||
|
|
||||||
ALTER TABLE job DROP flops_any_avg;
|
ALTER TABLE job DROP flops_any_avg;
|
||||||
ALTER TABLE job DROP mem_bw_avg;
|
ALTER TABLE job DROP mem_bw_avg;
|
||||||
ALTER TABLE job DROP mem_used_max;
|
ALTER TABLE job DROP mem_used_max;
|
||||||
ALTER TABLE job DROP load_avg;
|
ALTER TABLE job DROP load_avg;
|
||||||
|
ALTER TABLE job DROP net_bw_avg;
|
||||||
|
ALTER TABLE job DROP net_data_vol_total;
|
||||||
|
ALTER TABLE job DROP file_bw_avg;
|
||||||
|
ALTER TABLE job DROP file_data_vol_total;
|
||||||
|
|
||||||
|
CREATE INDEX IF NOT EXISTS jobs_cluster ON job (cluster);
|
||||||
|
CREATE INDEX IF NOT EXISTS jobs_cluster_starttime ON job (cluster, start_time);
|
||||||
|
CREATE INDEX IF NOT EXISTS jobs_cluster_user ON job (cluster, user);
|
||||||
|
CREATE INDEX IF NOT EXISTS jobs_cluster_project ON job (cluster, project);
|
||||||
|
CREATE INDEX IF NOT EXISTS jobs_cluster_subcluster ON job (cluster, subcluster);
|
||||||
|
|
||||||
|
CREATE INDEX IF NOT EXISTS jobs_cluster_partition ON job (cluster, partition);
|
||||||
|
CREATE INDEX IF NOT EXISTS jobs_cluster_partition_starttime ON job (cluster, partition, start_time);
|
||||||
|
CREATE INDEX IF NOT EXISTS jobs_cluster_partition_jobstate ON job (cluster, partition, job_state);
|
||||||
|
CREATE INDEX IF NOT EXISTS jobs_cluster_partition_jobstate_user ON job (cluster, partition, job_state, user);
|
||||||
|
CREATE INDEX IF NOT EXISTS jobs_cluster_partition_jobstate_project ON job (cluster, partition, job_state, project);
|
||||||
|
CREATE INDEX IF NOT EXISTS jobs_cluster_partition_jobstate_starttime ON job (cluster, partition, job_state, start_time);
|
||||||
|
|
||||||
|
CREATE INDEX IF NOT EXISTS jobs_cluster_jobstate ON job (cluster, job_state);
|
||||||
|
CREATE INDEX IF NOT EXISTS jobs_cluster_jobstate_starttime ON job (cluster, job_state, start_time);
|
||||||
|
CREATE INDEX IF NOT EXISTS jobs_cluster_jobstate_user ON job (cluster, job_state, user);
|
||||||
|
CREATE INDEX IF NOT EXISTS jobs_cluster_jobstate_project ON job (cluster, job_state, project);
|
||||||
|
|
||||||
|
CREATE INDEX IF NOT EXISTS jobs_user ON job (user);
|
||||||
|
CREATE INDEX IF NOT EXISTS jobs_user_starttime ON job (user, start_time);
|
||||||
|
|
||||||
|
CREATE INDEX IF NOT EXISTS jobs_project ON job (project);
|
||||||
|
CREATE INDEX IF NOT EXISTS jobs_project_starttime ON job (project, start_time);
|
||||||
|
CREATE INDEX IF NOT EXISTS jobs_project_user ON job (project, user);
|
||||||
|
|
||||||
|
CREATE INDEX IF NOT EXISTS jobs_jobstate ON job (job_state);
|
||||||
|
CREATE INDEX IF NOT EXISTS jobs_jobstate_user ON job (job_state, user);
|
||||||
|
CREATE INDEX IF NOT EXISTS jobs_jobstate_project ON job (job_state, project);
|
||||||
|
CREATE INDEX IF NOT EXISTS jobs_jobstate_cluster ON job (job_state, cluster);
|
||||||
|
CREATE INDEX IF NOT EXISTS jobs_jobstate_starttime ON job (job_state, start_time);
|
||||||
|
|
||||||
|
CREATE INDEX IF NOT EXISTS jobs_arrayjobid_starttime ON job (array_job_id, start_time);
|
||||||
|
CREATE INDEX IF NOT EXISTS jobs_cluster_arrayjobid_starttime ON job (cluster, array_job_id, start_time);
|
||||||
|
|
||||||
|
CREATE INDEX IF NOT EXISTS jobs_starttime ON job (start_time);
|
||||||
|
CREATE INDEX IF NOT EXISTS jobs_duration ON job (duration);
|
||||||
|
CREATE INDEX IF NOT EXISTS jobs_numnodes ON job (num_nodes);
|
||||||
|
CREATE INDEX IF NOT EXISTS jobs_numhwthreads ON job (num_hwthreads);
|
||||||
|
CREATE INDEX IF NOT EXISTS jobs_numacc ON job (num_acc);
|
||||||
|
|
||||||
|
PRAGMA optimize;
|
||||||
|
@ -13,7 +13,7 @@ import (
|
|||||||
|
|
||||||
"github.com/ClusterCockpit/cc-backend/internal/config"
|
"github.com/ClusterCockpit/cc-backend/internal/config"
|
||||||
"github.com/ClusterCockpit/cc-backend/internal/graph/model"
|
"github.com/ClusterCockpit/cc-backend/internal/graph/model"
|
||||||
"github.com/ClusterCockpit/cc-backend/internal/metricdata"
|
"github.com/ClusterCockpit/cc-backend/internal/metricDataDispatcher"
|
||||||
"github.com/ClusterCockpit/cc-backend/pkg/archive"
|
"github.com/ClusterCockpit/cc-backend/pkg/archive"
|
||||||
"github.com/ClusterCockpit/cc-backend/pkg/log"
|
"github.com/ClusterCockpit/cc-backend/pkg/log"
|
||||||
"github.com/ClusterCockpit/cc-backend/pkg/schema"
|
"github.com/ClusterCockpit/cc-backend/pkg/schema"
|
||||||
@ -286,13 +286,17 @@ func (r *JobRepository) JobsStats(
|
|||||||
return stats, nil
|
return stats, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// FIXME: Make generic
|
func LoadJobStat(job *schema.JobMeta, metric string, statType string) float64 {
|
||||||
func LoadJobStat(job *schema.JobMeta, metric string) float64 {
|
|
||||||
if stats, ok := job.Statistics[metric]; ok {
|
if stats, ok := job.Statistics[metric]; ok {
|
||||||
if metric == "mem_used" {
|
switch statType {
|
||||||
return stats.Max
|
case "avg":
|
||||||
} else {
|
|
||||||
return stats.Avg
|
return stats.Avg
|
||||||
|
case "max":
|
||||||
|
return stats.Max
|
||||||
|
case "min":
|
||||||
|
return stats.Min
|
||||||
|
default:
|
||||||
|
log.Errorf("Unknown stat type %s", statType)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -691,7 +695,7 @@ func (r *JobRepository) runningJobsMetricStatisticsHistogram(
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := metricdata.LoadAverages(job, metrics, avgs, ctx); err != nil {
|
if err := metricDataDispatcher.LoadAverages(job, metrics, avgs, ctx); err != nil {
|
||||||
log.Errorf("Error while loading averages for histogram: %s", err)
|
log.Errorf("Error while loading averages for histogram: %s", err)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -6,7 +6,6 @@ package repository
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"github.com/ClusterCockpit/cc-backend/pkg/log"
|
"github.com/ClusterCockpit/cc-backend/pkg/log"
|
||||||
"github.com/ClusterCockpit/cc-backend/pkg/schema"
|
|
||||||
"github.com/jmoiron/sqlx"
|
"github.com/jmoiron/sqlx"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -18,20 +17,12 @@ type Transaction struct {
|
|||||||
func (r *JobRepository) TransactionInit() (*Transaction, error) {
|
func (r *JobRepository) TransactionInit() (*Transaction, error) {
|
||||||
var err error
|
var err error
|
||||||
t := new(Transaction)
|
t := new(Transaction)
|
||||||
// Inserts are bundled into transactions because in sqlite,
|
|
||||||
// that speeds up inserts A LOT.
|
|
||||||
t.tx, err = r.DB.Beginx()
|
t.tx, err = r.DB.Beginx()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Warn("Error while bundling transactions")
|
log.Warn("Error while bundling transactions")
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
t.stmt, err = t.tx.PrepareNamed(NamedJobInsert)
|
|
||||||
if err != nil {
|
|
||||||
log.Warn("Error while preparing namedJobInsert")
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return t, nil
|
return t, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -50,7 +41,6 @@ func (r *JobRepository) TransactionCommit(t *Transaction) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
t.stmt = t.tx.NamedStmt(t.stmt)
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -63,10 +53,14 @@ func (r *JobRepository) TransactionEnd(t *Transaction) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *JobRepository) TransactionAdd(t *Transaction, job schema.Job) (int64, error) {
|
func (r *JobRepository) TransactionAddNamed(
|
||||||
res, err := t.stmt.Exec(job)
|
t *Transaction,
|
||||||
|
query string,
|
||||||
|
args ...interface{},
|
||||||
|
) (int64, error) {
|
||||||
|
res, err := t.tx.NamedExec(query, args)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("repository initDB(): %v", err)
|
log.Errorf("Named Exec failed: %v", err)
|
||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -79,26 +73,14 @@ func (r *JobRepository) TransactionAdd(t *Transaction, job schema.Job) (int64, e
|
|||||||
return id, nil
|
return id, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *JobRepository) TransactionAddTag(t *Transaction, tag *schema.Tag) (int64, error) {
|
func (r *JobRepository) TransactionAdd(t *Transaction, query string, args ...interface{}) (int64, error) {
|
||||||
res, err := t.tx.Exec(`INSERT INTO tag (tag_name, tag_type) VALUES (?, ?)`, tag.Name, tag.Type)
|
res := t.tx.MustExec(query, args)
|
||||||
|
|
||||||
|
id, err := res.LastInsertId()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("Error while inserting tag into tag table: %v (Type %v)", tag.Name, tag.Type)
|
log.Errorf("repository initDB(): %v", err)
|
||||||
return 0, err
|
|
||||||
}
|
|
||||||
tagId, err := res.LastInsertId()
|
|
||||||
if err != nil {
|
|
||||||
log.Warn("Error while getting last insert ID")
|
|
||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return tagId, nil
|
return id, nil
|
||||||
}
|
|
||||||
|
|
||||||
func (r *JobRepository) TransactionSetTag(t *Transaction, jobId int64, tagId int64) error {
|
|
||||||
if _, err := t.tx.Exec(`INSERT INTO jobtag (job_id, tag_id) VALUES (?, ?)`, jobId, tagId); err != nil {
|
|
||||||
log.Errorf("Error while inserting jobtag into jobtag table: %v (TagID %v)", jobId, tagId)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
@ -1,8 +0,0 @@
|
|||||||
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
|
|
||||||
// All rights reserved.
|
|
||||||
// Use of this source code is governed by a MIT-style
|
|
||||||
// license that can be found in the LICENSE file.
|
|
||||||
package taskManager
|
|
||||||
|
|
||||||
func registerFootprintWorker() {
|
|
||||||
}
|
|
@ -79,6 +79,9 @@ func Start() {
|
|||||||
RegisterLdapSyncService(lc.SyncInterval)
|
RegisterLdapSyncService(lc.SyncInterval)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
RegisterFootprintWorker()
|
||||||
|
RegisterUpdateDurationWorker()
|
||||||
|
|
||||||
s.Start()
|
s.Start()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
26
internal/taskManager/updateDurationService.go
Normal file
26
internal/taskManager/updateDurationService.go
Normal file
@ -0,0 +1,26 @@
|
|||||||
|
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
|
||||||
|
// All rights reserved.
|
||||||
|
// Use of this source code is governed by a MIT-style
|
||||||
|
// license that can be found in the LICENSE file.
|
||||||
|
package taskManager
|
||||||
|
|
||||||
|
import (
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/ClusterCockpit/cc-backend/pkg/log"
|
||||||
|
"github.com/go-co-op/gocron/v2"
|
||||||
|
)
|
||||||
|
|
||||||
|
func RegisterUpdateDurationWorker() {
|
||||||
|
log.Info("Register duration update service")
|
||||||
|
|
||||||
|
d, _ := time.ParseDuration("5m")
|
||||||
|
s.NewJob(gocron.DurationJob(d),
|
||||||
|
gocron.NewTask(
|
||||||
|
func() {
|
||||||
|
start := time.Now()
|
||||||
|
log.Printf("Update duration started at %s", start.Format(time.RFC3339))
|
||||||
|
jobRepo.UpdateDuration()
|
||||||
|
log.Printf("Update duration is done and took %s", time.Since(start))
|
||||||
|
}))
|
||||||
|
}
|
117
internal/taskManager/updateFootprintService.go
Normal file
117
internal/taskManager/updateFootprintService.go
Normal file
@ -0,0 +1,117 @@
|
|||||||
|
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
|
||||||
|
// All rights reserved.
|
||||||
|
// Use of this source code is governed by a MIT-style
|
||||||
|
// license that can be found in the LICENSE file.
|
||||||
|
package taskManager
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"math"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/ClusterCockpit/cc-backend/internal/metricDataDispatcher"
|
||||||
|
"github.com/ClusterCockpit/cc-backend/pkg/archive"
|
||||||
|
"github.com/ClusterCockpit/cc-backend/pkg/log"
|
||||||
|
"github.com/ClusterCockpit/cc-backend/pkg/schema"
|
||||||
|
sq "github.com/Masterminds/squirrel"
|
||||||
|
"github.com/go-co-op/gocron/v2"
|
||||||
|
)
|
||||||
|
|
||||||
|
func RegisterFootprintWorker() {
|
||||||
|
log.Info("Register Footprint Update service")
|
||||||
|
d, _ := time.ParseDuration("10m")
|
||||||
|
s.NewJob(gocron.DurationJob(d),
|
||||||
|
gocron.NewTask(
|
||||||
|
func() {
|
||||||
|
s := time.Now()
|
||||||
|
log.Printf("Update Footprints started at %s", s.Format(time.RFC3339))
|
||||||
|
|
||||||
|
t, err := jobRepo.TransactionInit()
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("Failed TransactionInit %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, cluster := range archive.Clusters {
|
||||||
|
jobs, err := jobRepo.FindRunningJobs(cluster.Name)
|
||||||
|
if err != nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
allMetrics := make([]string, 0)
|
||||||
|
metricConfigs := archive.GetCluster(cluster.Name).MetricConfig
|
||||||
|
for _, mc := range metricConfigs {
|
||||||
|
allMetrics = append(allMetrics, mc.Name)
|
||||||
|
}
|
||||||
|
|
||||||
|
scopes := []schema.MetricScope{schema.MetricScopeNode}
|
||||||
|
scopes = append(scopes, schema.MetricScopeCore)
|
||||||
|
scopes = append(scopes, schema.MetricScopeAccelerator)
|
||||||
|
|
||||||
|
for _, job := range jobs {
|
||||||
|
jobData, err := metricDataDispatcher.LoadData(job, allMetrics, scopes, context.Background())
|
||||||
|
if err != nil {
|
||||||
|
log.Error("Error wile loading job data for footprint update")
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
jobMeta := &schema.JobMeta{
|
||||||
|
BaseJob: job.BaseJob,
|
||||||
|
StartTime: job.StartTime.Unix(),
|
||||||
|
Statistics: make(map[string]schema.JobStatistics),
|
||||||
|
}
|
||||||
|
|
||||||
|
for metric, data := range jobData {
|
||||||
|
avg, min, max := 0.0, math.MaxFloat32, -math.MaxFloat32
|
||||||
|
nodeData, ok := data["node"]
|
||||||
|
if !ok {
|
||||||
|
// This should never happen ?
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, series := range nodeData.Series {
|
||||||
|
avg += series.Statistics.Avg
|
||||||
|
min = math.Min(min, series.Statistics.Min)
|
||||||
|
max = math.Max(max, series.Statistics.Max)
|
||||||
|
}
|
||||||
|
|
||||||
|
jobMeta.Statistics[metric] = schema.JobStatistics{
|
||||||
|
Unit: schema.Unit{
|
||||||
|
Prefix: archive.GetMetricConfig(job.Cluster, metric).Unit.Prefix,
|
||||||
|
Base: archive.GetMetricConfig(job.Cluster, metric).Unit.Base,
|
||||||
|
},
|
||||||
|
Avg: avg / float64(job.NumNodes),
|
||||||
|
Min: min,
|
||||||
|
Max: max,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
stmt := sq.Update("job").Where("job.id = ?", job.ID)
|
||||||
|
stmt, err = jobRepo.UpdateFootprint(stmt, jobMeta)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("Update job (dbid: %d) failed at update Footprint step: %s", job.ID, err.Error())
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
stmt, err = jobRepo.UpdateEnergy(stmt, jobMeta)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("Update job (dbid: %d) failed at update Energy step: %s", job.ID, err.Error())
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
query, args, err := stmt.ToSql()
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("Failed in ToSQL conversion %v", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
jobRepo.TransactionAdd(t, query, args)
|
||||||
|
// if err := jobRepo.Execute(stmt); err != nil {
|
||||||
|
// log.Errorf("Update job (dbid: %d) failed at db execute: %s", job.ID, err.Error())
|
||||||
|
// continue
|
||||||
|
// }
|
||||||
|
}
|
||||||
|
|
||||||
|
jobRepo.TransactionCommit(t)
|
||||||
|
}
|
||||||
|
jobRepo.TransactionEnd(t)
|
||||||
|
log.Printf("Update Footprints is done and took %s", time.Since(s))
|
||||||
|
}))
|
||||||
|
}
|
@ -88,7 +88,7 @@ func initClusterConfig() error {
|
|||||||
sc.Footprint = append(sc.Footprint, newMetric.Name)
|
sc.Footprint = append(sc.Footprint, newMetric.Name)
|
||||||
ml.Footprint = newMetric.Footprint
|
ml.Footprint = newMetric.Footprint
|
||||||
}
|
}
|
||||||
if newMetric.Energy {
|
if newMetric.Energy != "" {
|
||||||
sc.EnergyFootprint = append(sc.EnergyFootprint, newMetric.Name)
|
sc.EnergyFootprint = append(sc.EnergyFootprint, newMetric.Name)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -99,7 +99,7 @@ func initClusterConfig() error {
|
|||||||
if newMetric.Footprint != "" {
|
if newMetric.Footprint != "" {
|
||||||
sc.Footprint = append(sc.Footprint, newMetric.Name)
|
sc.Footprint = append(sc.Footprint, newMetric.Name)
|
||||||
}
|
}
|
||||||
if newMetric.Energy {
|
if newMetric.Energy != "" {
|
||||||
sc.EnergyFootprint = append(sc.EnergyFootprint, newMetric.Name)
|
sc.EnergyFootprint = append(sc.EnergyFootprint, newMetric.Name)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -221,3 +221,13 @@ func GetSubClusterByNode(cluster, hostname string) (string, error) {
|
|||||||
|
|
||||||
return "", fmt.Errorf("ARCHIVE/CLUSTERCONFIG > no subcluster found for cluster %v and host %v", cluster, hostname)
|
return "", fmt.Errorf("ARCHIVE/CLUSTERCONFIG > no subcluster found for cluster %v and host %v", cluster, hostname)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func MetricIndex(mc []schema.MetricConfig, name string) (int, error) {
|
||||||
|
for i, m := range mc {
|
||||||
|
if m.Name == name {
|
||||||
|
return i, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0, fmt.Errorf("Unknown metric name %s", name)
|
||||||
|
}
|
||||||
|
@ -94,7 +94,7 @@
|
|||||||
},
|
},
|
||||||
"scope": "hwthread",
|
"scope": "hwthread",
|
||||||
"aggregation": "sum",
|
"aggregation": "sum",
|
||||||
"energy": true,
|
"energy": "power",
|
||||||
"timestep": 60,
|
"timestep": 60,
|
||||||
"peak": 500,
|
"peak": 500,
|
||||||
"normal": 250,
|
"normal": 250,
|
||||||
@ -136,7 +136,7 @@
|
|||||||
},
|
},
|
||||||
"scope": "accelerator",
|
"scope": "accelerator",
|
||||||
"aggregation": "sum",
|
"aggregation": "sum",
|
||||||
"energy": true,
|
"energy": "power",
|
||||||
"timestep": 60,
|
"timestep": 60,
|
||||||
"peak": 400,
|
"peak": 400,
|
||||||
"normal": 200,
|
"normal": 200,
|
||||||
@ -190,7 +190,7 @@
|
|||||||
},
|
},
|
||||||
"scope": "socket",
|
"scope": "socket",
|
||||||
"aggregation": "sum",
|
"aggregation": "sum",
|
||||||
"energy": true,
|
"energy": "power",
|
||||||
"timestep": 60,
|
"timestep": 60,
|
||||||
"peak": 500,
|
"peak": 500,
|
||||||
"normal": 250,
|
"normal": 250,
|
||||||
|
@ -256,7 +256,7 @@
|
|||||||
"normal": 250,
|
"normal": 250,
|
||||||
"caution": 100,
|
"caution": 100,
|
||||||
"alert": 50,
|
"alert": 50,
|
||||||
"energy": true
|
"energy": "power"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"name": "mem_power",
|
"name": "mem_power",
|
||||||
@ -270,7 +270,7 @@
|
|||||||
"normal": 50,
|
"normal": 50,
|
||||||
"caution": 20,
|
"caution": 20,
|
||||||
"alert": 10,
|
"alert": 10,
|
||||||
"energy": true
|
"energy": "power"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"name": "ipc",
|
"name": "ipc",
|
||||||
|
@ -47,14 +47,14 @@ type SubCluster struct {
|
|||||||
|
|
||||||
type SubClusterConfig struct {
|
type SubClusterConfig struct {
|
||||||
Name string `json:"name"`
|
Name string `json:"name"`
|
||||||
|
Footprint string `json:"footprint,omitempty"`
|
||||||
Peak float64 `json:"peak"`
|
Peak float64 `json:"peak"`
|
||||||
Normal float64 `json:"normal"`
|
Normal float64 `json:"normal"`
|
||||||
Caution float64 `json:"caution"`
|
Caution float64 `json:"caution"`
|
||||||
Alert float64 `json:"alert"`
|
Alert float64 `json:"alert"`
|
||||||
Footprint string `json:"footprint,omitempty"`
|
|
||||||
Remove bool `json:"remove"`
|
Remove bool `json:"remove"`
|
||||||
LowerIsBetter bool `json:"lowerIsBetter"`
|
LowerIsBetter bool `json:"lowerIsBetter"`
|
||||||
Energy bool `json:"energy"`
|
Energy string `json:"energy"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type MetricConfig struct {
|
type MetricConfig struct {
|
||||||
@ -62,15 +62,15 @@ type MetricConfig struct {
|
|||||||
Name string `json:"name"`
|
Name string `json:"name"`
|
||||||
Scope MetricScope `json:"scope"`
|
Scope MetricScope `json:"scope"`
|
||||||
Aggregation string `json:"aggregation"`
|
Aggregation string `json:"aggregation"`
|
||||||
|
Footprint string `json:"footprint,omitempty"`
|
||||||
SubClusters []*SubClusterConfig `json:"subClusters,omitempty"`
|
SubClusters []*SubClusterConfig `json:"subClusters,omitempty"`
|
||||||
Timestep int `json:"timestep"`
|
|
||||||
Peak float64 `json:"peak"`
|
Peak float64 `json:"peak"`
|
||||||
Normal float64 `json:"normal"`
|
Normal float64 `json:"normal"`
|
||||||
Caution float64 `json:"caution"`
|
Caution float64 `json:"caution"`
|
||||||
Alert float64 `json:"alert"`
|
Alert float64 `json:"alert"`
|
||||||
|
Timestep int `json:"timestep"`
|
||||||
LowerIsBetter bool `json:"lowerIsBetter"`
|
LowerIsBetter bool `json:"lowerIsBetter"`
|
||||||
Footprint string `json:"footprint,omitempty"`
|
Energy string `json:"energy"`
|
||||||
Energy bool `json:"energy"`
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type Cluster struct {
|
type Cluster struct {
|
||||||
|
@ -32,7 +32,7 @@ type BaseJob struct {
|
|||||||
Footprint map[string]float64 `json:"footprint"`
|
Footprint map[string]float64 `json:"footprint"`
|
||||||
MetaData map[string]string `json:"metaData"`
|
MetaData map[string]string `json:"metaData"`
|
||||||
ConcurrentJobs JobLinkResultList `json:"concurrentJobs"`
|
ConcurrentJobs JobLinkResultList `json:"concurrentJobs"`
|
||||||
Energy float64 `json:"energy"`
|
Energy float64 `json:"energy" db:"energy"`
|
||||||
ArrayJobId int64 `json:"arrayJobId,omitempty" db:"array_job_id" example:"123000"`
|
ArrayJobId int64 `json:"arrayJobId,omitempty" db:"array_job_id" example:"123000"`
|
||||||
Walltime int64 `json:"walltime,omitempty" db:"walltime" example:"86400" minimum:"1"`
|
Walltime int64 `json:"walltime,omitempty" db:"walltime" example:"86400" minimum:"1"`
|
||||||
JobID int64 `json:"jobId" db:"job_id" example:"123000"`
|
JobID int64 `json:"jobId" db:"job_id" example:"123000"`
|
||||||
|
@ -39,6 +39,27 @@
|
|||||||
"avg"
|
"avg"
|
||||||
]
|
]
|
||||||
},
|
},
|
||||||
|
"footprint": {
|
||||||
|
"description": "Is it a footprint metric and what type",
|
||||||
|
"type": "string",
|
||||||
|
"enum": [
|
||||||
|
"avg",
|
||||||
|
"max",
|
||||||
|
"min"
|
||||||
|
]
|
||||||
|
},
|
||||||
|
"energy": {
|
||||||
|
"description": "Is it used to calculate job energy",
|
||||||
|
"type": "string",
|
||||||
|
"enum": [
|
||||||
|
"power",
|
||||||
|
"energy"
|
||||||
|
]
|
||||||
|
},
|
||||||
|
"lowerIsBetter": {
|
||||||
|
"description": "Is lower better.",
|
||||||
|
"type": "boolean"
|
||||||
|
},
|
||||||
"peak": {
|
"peak": {
|
||||||
"description": "Metric peak threshold (Upper metric limit)",
|
"description": "Metric peak threshold (Upper metric limit)",
|
||||||
"type": "number"
|
"type": "number"
|
||||||
@ -65,6 +86,27 @@
|
|||||||
"description": "Hardware partition name",
|
"description": "Hardware partition name",
|
||||||
"type": "string"
|
"type": "string"
|
||||||
},
|
},
|
||||||
|
"footprint": {
|
||||||
|
"description": "Is it a footprint metric and what type. Overwrite global setting",
|
||||||
|
"type": "string",
|
||||||
|
"enum": [
|
||||||
|
"avg",
|
||||||
|
"max",
|
||||||
|
"min"
|
||||||
|
]
|
||||||
|
},
|
||||||
|
"energy": {
|
||||||
|
"description": "Is it used to calculate job energy. Overwrite global",
|
||||||
|
"type": "string",
|
||||||
|
"enum": [
|
||||||
|
"power",
|
||||||
|
"energy"
|
||||||
|
]
|
||||||
|
},
|
||||||
|
"lowerIsBetter": {
|
||||||
|
"description": "Is lower better. Overwrite global",
|
||||||
|
"type": "boolean"
|
||||||
|
},
|
||||||
"peak": {
|
"peak": {
|
||||||
"type": "number"
|
"type": "number"
|
||||||
},
|
},
|
||||||
@ -78,6 +120,7 @@
|
|||||||
"type": "number"
|
"type": "number"
|
||||||
},
|
},
|
||||||
"remove": {
|
"remove": {
|
||||||
|
"description": "Remove this metric for this subcluster",
|
||||||
"type": "boolean"
|
"type": "boolean"
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
@ -67,8 +67,9 @@
|
|||||||
export let height = "310px";
|
export let height = "310px";
|
||||||
|
|
||||||
const footprintData = job?.footprint?.map((jf) => {
|
const footprintData = job?.footprint?.map((jf) => {
|
||||||
// Unit
|
|
||||||
const fmc = getContext("getMetricConfig")(job.cluster, job.subCluster, jf.name);
|
const fmc = getContext("getMetricConfig")(job.cluster, job.subCluster, jf.name);
|
||||||
|
if (fmc) {
|
||||||
|
// Unit
|
||||||
const unit = (fmc?.unit?.prefix ? fmc.unit.prefix : "") + (fmc?.unit?.base ? fmc.unit.base : "")
|
const unit = (fmc?.unit?.prefix ? fmc.unit.prefix : "") + (fmc?.unit?.base ? fmc.unit.base : "")
|
||||||
|
|
||||||
// Threshold / -Differences
|
// Threshold / -Differences
|
||||||
@ -122,7 +123,18 @@
|
|||||||
impact: -1,
|
impact: -1,
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
});
|
} else { // No matching metric config: display as single value
|
||||||
|
return {
|
||||||
|
name: jf.name + ' (' + jf.stat + ')',
|
||||||
|
avg: jf.value,
|
||||||
|
message:
|
||||||
|
`No config for metric ${jf.name} found.`,
|
||||||
|
impact: 4,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}).sort(function (a, b) { // Sort by impact value primarily, within impact sort name alphabetically
|
||||||
|
return a.impact - b.impact || ((a.name > b.name) ? 1 : ((b.name > a.name) ? -1 : 0));
|
||||||
|
});;
|
||||||
|
|
||||||
function evalFootprint(mean, thresholds, lowerIsBetter, level) {
|
function evalFootprint(mean, thresholds, lowerIsBetter, level) {
|
||||||
// Handle Metrics in which less value is better
|
// Handle Metrics in which less value is better
|
||||||
@ -159,6 +171,7 @@
|
|||||||
{/if}
|
{/if}
|
||||||
<CardBody>
|
<CardBody>
|
||||||
{#each footprintData as fpd, index}
|
{#each footprintData as fpd, index}
|
||||||
|
{#if fpd.impact !== 4}
|
||||||
<div class="mb-1 d-flex justify-content-between">
|
<div class="mb-1 d-flex justify-content-between">
|
||||||
<div> <b>{fpd.name}</b></div>
|
<div> <b>{fpd.name}</b></div>
|
||||||
<!-- For symmetry, see below ...-->
|
<!-- For symmetry, see below ...-->
|
||||||
@ -213,6 +226,29 @@
|
|||||||
</Col>
|
</Col>
|
||||||
{/if}
|
{/if}
|
||||||
</Row>
|
</Row>
|
||||||
|
{:else}
|
||||||
|
<div class="mb-1 d-flex justify-content-between">
|
||||||
|
<div>
|
||||||
|
<b>{fpd.name}</b>
|
||||||
|
</div>
|
||||||
|
<div
|
||||||
|
class="cursor-help d-inline-flex"
|
||||||
|
id={`footprint-${job.jobId}-${index}`}
|
||||||
|
>
|
||||||
|
<div class="mx-1">
|
||||||
|
<Icon name="info-circle"/>
|
||||||
|
</div>
|
||||||
|
<div>
|
||||||
|
{fpd.avg}
|
||||||
|
</div>
|
||||||
|
</div>
|
||||||
|
</div>
|
||||||
|
<Tooltip
|
||||||
|
target={`footprint-${job.jobId}-${index}`}
|
||||||
|
placement="right"
|
||||||
|
offset={[0, 20]}>{fpd.message}</Tooltip
|
||||||
|
>
|
||||||
|
{/if}
|
||||||
{/each}
|
{/each}
|
||||||
{#if job?.metaData?.message}
|
{#if job?.metaData?.message}
|
||||||
<hr class="mt-1 mb-2" />
|
<hr class="mt-1 mb-2" />
|
||||||
|
@ -309,15 +309,13 @@ export function checkMetricDisabled(m, c, s) { //[m]etric, [c]luster, [s]ubclust
|
|||||||
|
|
||||||
export function getStatsItems() {
|
export function getStatsItems() {
|
||||||
// console.time('stats')
|
// console.time('stats')
|
||||||
// console.log('getStatsItems ...')
|
|
||||||
const globalMetrics = getContext("globalMetrics")
|
const globalMetrics = getContext("globalMetrics")
|
||||||
const result = globalMetrics.map((gm) => {
|
const result = globalMetrics.map((gm) => {
|
||||||
if (gm?.footprint) {
|
if (gm?.footprint) {
|
||||||
// Footprint contains suffix naming the used stat-type
|
|
||||||
// console.time('deep')
|
// console.time('deep')
|
||||||
// console.log('Deep Config for', gm.name)
|
|
||||||
const mc = getMetricConfigDeep(gm.name, null, null)
|
const mc = getMetricConfigDeep(gm.name, null, null)
|
||||||
// console.timeEnd('deep')
|
// console.timeEnd('deep')
|
||||||
|
if (mc) {
|
||||||
return {
|
return {
|
||||||
field: gm.name + '_' + gm.footprint,
|
field: gm.name + '_' + gm.footprint,
|
||||||
text: gm.name + ' (' + gm.footprint + ')',
|
text: gm.name + ' (' + gm.footprint + ')',
|
||||||
@ -328,6 +326,7 @@ export function getStatsItems() {
|
|||||||
enabled: false
|
enabled: false
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
return null
|
return null
|
||||||
}).filter((r) => r != null)
|
}).filter((r) => r != null)
|
||||||
// console.timeEnd('stats')
|
// console.timeEnd('stats')
|
||||||
@ -336,11 +335,9 @@ export function getStatsItems() {
|
|||||||
|
|
||||||
export function getSortItems() {
|
export function getSortItems() {
|
||||||
//console.time('sort')
|
//console.time('sort')
|
||||||
//console.log('getSortItems ...')
|
|
||||||
const globalMetrics = getContext("globalMetrics")
|
const globalMetrics = getContext("globalMetrics")
|
||||||
const result = globalMetrics.map((gm) => {
|
const result = globalMetrics.map((gm) => {
|
||||||
if (gm?.footprint) {
|
if (gm?.footprint) {
|
||||||
// Footprint contains suffix naming the used stat-type
|
|
||||||
return {
|
return {
|
||||||
field: gm.name + '_' + gm.footprint,
|
field: gm.name + '_' + gm.footprint,
|
||||||
type: 'foot',
|
type: 'foot',
|
||||||
@ -357,21 +354,22 @@ export function getSortItems() {
|
|||||||
function getMetricConfigDeep(metric, cluster, subCluster) {
|
function getMetricConfigDeep(metric, cluster, subCluster) {
|
||||||
const clusters = getContext("clusters");
|
const clusters = getContext("clusters");
|
||||||
if (cluster != null) {
|
if (cluster != null) {
|
||||||
let c = clusters.find((c) => c.name == cluster);
|
const c = clusters.find((c) => c.name == cluster);
|
||||||
if (subCluster != null) {
|
if (subCluster != null) {
|
||||||
let sc = c.subClusters.find((sc) => sc.name == subCluster);
|
const sc = c.subClusters.find((sc) => sc.name == subCluster);
|
||||||
return sc.metricConfig.find((mc) => mc.name == metric)
|
return sc.metricConfig.find((mc) => mc.name == metric)
|
||||||
} else {
|
} else {
|
||||||
let result;
|
let result;
|
||||||
for (let sc of c.subClusters) {
|
for (let sc of c.subClusters) {
|
||||||
const mc = sc.metricConfig.find((mc) => mc.name == metric)
|
const mc = sc.metricConfig.find((mc) => mc.name == metric)
|
||||||
if (result) { // If lowerIsBetter: Peak is still maximum value, no special case required
|
if (result && mc) { // update result; If lowerIsBetter: Peak is still maximum value, no special case required
|
||||||
result.alert = (mc.alert > result.alert) ? mc.alert : result.alert
|
result.alert = (mc.alert > result.alert) ? mc.alert : result.alert
|
||||||
result.caution = (mc.caution > result.caution) ? mc.caution : result.caution
|
result.caution = (mc.caution > result.caution) ? mc.caution : result.caution
|
||||||
result.normal = (mc.normal > result.normal) ? mc.normal : result.normal
|
result.normal = (mc.normal > result.normal) ? mc.normal : result.normal
|
||||||
result.peak = (mc.peak > result.peak) ? mc.peak : result.peak
|
result.peak = (mc.peak > result.peak) ? mc.peak : result.peak
|
||||||
} else {
|
} else if (mc) {
|
||||||
if (mc) result = {...mc};
|
// start new result
|
||||||
|
result = {...mc};
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return result
|
return result
|
||||||
@ -381,13 +379,14 @@ function getMetricConfigDeep(metric, cluster, subCluster) {
|
|||||||
for (let c of clusters) {
|
for (let c of clusters) {
|
||||||
for (let sc of c.subClusters) {
|
for (let sc of c.subClusters) {
|
||||||
const mc = sc.metricConfig.find((mc) => mc.name == metric)
|
const mc = sc.metricConfig.find((mc) => mc.name == metric)
|
||||||
if (result) { // If lowerIsBetter: Peak is still maximum value, no special case required
|
if (result && mc) { // update result; If lowerIsBetter: Peak is still maximum value, no special case required
|
||||||
result.alert = (mc.alert > result.alert) ? mc.alert : result.alert
|
result.alert = (mc.alert > result.alert) ? mc.alert : result.alert
|
||||||
result.caution = (mc.caution > result.caution) ? mc.caution : result.caution
|
result.caution = (mc.caution > result.caution) ? mc.caution : result.caution
|
||||||
result.normal = (mc.normal > result.normal) ? mc.normal : result.normal
|
result.normal = (mc.normal > result.normal) ? mc.normal : result.normal
|
||||||
result.peak = (mc.peak > result.peak) ? mc.peak : result.peak
|
result.peak = (mc.peak > result.peak) ? mc.peak : result.peak
|
||||||
} else {
|
} else if (mc) {
|
||||||
if (mc) result = {...mc};
|
// Start new result
|
||||||
|
result = {...mc};
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user