mirror of
https://github.com/ClusterCockpit/cc-backend
synced 2026-03-26 17:47:31 +01:00
@@ -429,7 +429,7 @@ type TimeRangeOutput {
|
|||||||
input NodeFilter {
|
input NodeFilter {
|
||||||
hostname: StringInput
|
hostname: StringInput
|
||||||
cluster: StringInput
|
cluster: StringInput
|
||||||
subcluster: StringInput
|
subCluster: StringInput
|
||||||
schedulerState: SchedulerState
|
schedulerState: SchedulerState
|
||||||
healthState: MonitoringState
|
healthState: MonitoringState
|
||||||
timeStart: Int
|
timeStart: Int
|
||||||
@@ -444,6 +444,7 @@ input JobFilter {
|
|||||||
project: StringInput
|
project: StringInput
|
||||||
jobName: StringInput
|
jobName: StringInput
|
||||||
cluster: StringInput
|
cluster: StringInput
|
||||||
|
subCluster: StringInput
|
||||||
partition: StringInput
|
partition: StringInput
|
||||||
duration: IntRange
|
duration: IntRange
|
||||||
energy: FloatRange
|
energy: FloatRange
|
||||||
|
|||||||
2
go.sum
2
go.sum
@@ -4,8 +4,6 @@ github.com/99designs/gqlgen v0.17.85 h1:EkGx3U2FDcxQm8YDLQSpXIAVmpDyZ3IcBMOJi2nH
|
|||||||
github.com/99designs/gqlgen v0.17.85/go.mod h1:yvs8s0bkQlRfqg03YXr3eR4OQUowVhODT/tHzCXnbOU=
|
github.com/99designs/gqlgen v0.17.85/go.mod h1:yvs8s0bkQlRfqg03YXr3eR4OQUowVhODT/tHzCXnbOU=
|
||||||
github.com/Azure/go-ntlmssp v0.0.0-20221128193559-754e69321358 h1:mFRzDkZVAjdal+s7s0MwaRv9igoPqLRdzOLzw/8Xvq8=
|
github.com/Azure/go-ntlmssp v0.0.0-20221128193559-754e69321358 h1:mFRzDkZVAjdal+s7s0MwaRv9igoPqLRdzOLzw/8Xvq8=
|
||||||
github.com/Azure/go-ntlmssp v0.0.0-20221128193559-754e69321358/go.mod h1:chxPXzSsl7ZWRAuOIE23GDNzjWuZquvFlgA8xmpunjU=
|
github.com/Azure/go-ntlmssp v0.0.0-20221128193559-754e69321358/go.mod h1:chxPXzSsl7ZWRAuOIE23GDNzjWuZquvFlgA8xmpunjU=
|
||||||
github.com/ClusterCockpit/cc-lib/v2 v2.2.0 h1:gqMsh7zsJMUhaXviXzaZ3gqXcLVgerjRJHzIcwX4FmQ=
|
|
||||||
github.com/ClusterCockpit/cc-lib/v2 v2.2.0/go.mod h1:JuxMAuEOaLLNEnnL9U3ejha8kMvsSatLdKPZEgJw6iw=
|
|
||||||
github.com/ClusterCockpit/cc-lib/v2 v2.2.1 h1:iCVas+Jc61zFH5S2VG3H1sc7tsn+U4lOJwUYjYZEims=
|
github.com/ClusterCockpit/cc-lib/v2 v2.2.1 h1:iCVas+Jc61zFH5S2VG3H1sc7tsn+U4lOJwUYjYZEims=
|
||||||
github.com/ClusterCockpit/cc-lib/v2 v2.2.1/go.mod h1:JuxMAuEOaLLNEnnL9U3ejha8kMvsSatLdKPZEgJw6iw=
|
github.com/ClusterCockpit/cc-lib/v2 v2.2.1/go.mod h1:JuxMAuEOaLLNEnnL9U3ejha8kMvsSatLdKPZEgJw6iw=
|
||||||
github.com/KyleBanks/depth v1.2.1 h1:5h8fQADFrWtarTdtDudMmGsC7GPbOAu6RVB3ffsVFHc=
|
github.com/KyleBanks/depth v1.2.1 h1:5h8fQADFrWtarTdtDudMmGsC7GPbOAu6RVB3ffsVFHc=
|
||||||
|
|||||||
@@ -691,7 +691,7 @@ func (api *RestAPI) startJob(rw http.ResponseWriter, r *http.Request) {
|
|||||||
for _, job := range jobs {
|
for _, job := range jobs {
|
||||||
// Check if jobs are within the same day (prevent duplicates)
|
// Check if jobs are within the same day (prevent duplicates)
|
||||||
if (req.StartTime - job.StartTime) < secondsPerDay {
|
if (req.StartTime - job.StartTime) < secondsPerDay {
|
||||||
handleError(fmt.Errorf("a job with that jobId, cluster and startTime already exists: dbid: %d, jobid: %d", job.ID, job.JobID), http.StatusUnprocessableEntity, rw)
|
handleError(fmt.Errorf("a job with that jobId, cluster and startTime already exists: dbid: %d, jobid: %d", *job.ID, job.JobID), http.StatusUnprocessableEntity, rw)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -860,7 +860,7 @@ func (api *RestAPI) deleteJobByRequest(rw http.ResponseWriter, r *http.Request)
|
|||||||
rw.Header().Add("Content-Type", "application/json")
|
rw.Header().Add("Content-Type", "application/json")
|
||||||
rw.WriteHeader(http.StatusOK)
|
rw.WriteHeader(http.StatusOK)
|
||||||
if err := json.NewEncoder(rw).Encode(DefaultAPIResponse{
|
if err := json.NewEncoder(rw).Encode(DefaultAPIResponse{
|
||||||
Message: fmt.Sprintf("Successfully deleted job %d", job.ID),
|
Message: fmt.Sprintf("Successfully deleted job %d", *job.ID),
|
||||||
}); err != nil {
|
}); err != nil {
|
||||||
cclog.Errorf("Failed to encode response: %v", err)
|
cclog.Errorf("Failed to encode response: %v", err)
|
||||||
}
|
}
|
||||||
@@ -926,17 +926,17 @@ func (api *RestAPI) deleteJobBefore(rw http.ResponseWriter, r *http.Request) {
|
|||||||
func (api *RestAPI) checkAndHandleStopJob(rw http.ResponseWriter, job *schema.Job, req StopJobAPIRequest) {
|
func (api *RestAPI) checkAndHandleStopJob(rw http.ResponseWriter, job *schema.Job, req StopJobAPIRequest) {
|
||||||
// Sanity checks
|
// Sanity checks
|
||||||
if job.State != schema.JobStateRunning {
|
if job.State != schema.JobStateRunning {
|
||||||
handleError(fmt.Errorf("jobId %d (id %d) on %s : job has already been stopped (state is: %s)", job.JobID, job.ID, job.Cluster, job.State), http.StatusUnprocessableEntity, rw)
|
handleError(fmt.Errorf("jobId %d (id %d) on %s : job has already been stopped (state is: %s)", job.JobID, *job.ID, job.Cluster, job.State), http.StatusUnprocessableEntity, rw)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if job.StartTime > req.StopTime {
|
if job.StartTime > req.StopTime {
|
||||||
handleError(fmt.Errorf("jobId %d (id %d) on %s : stopTime %d must be larger/equal than startTime %d", job.JobID, job.ID, job.Cluster, req.StopTime, job.StartTime), http.StatusBadRequest, rw)
|
handleError(fmt.Errorf("jobId %d (id %d) on %s : stopTime %d must be larger/equal than startTime %d", job.JobID, *job.ID, job.Cluster, req.StopTime, job.StartTime), http.StatusBadRequest, rw)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if req.State != "" && !req.State.Valid() {
|
if req.State != "" && !req.State.Valid() {
|
||||||
handleError(fmt.Errorf("jobId %d (id %d) on %s : invalid requested job state: %#v", job.JobID, job.ID, job.Cluster, req.State), http.StatusBadRequest, rw)
|
handleError(fmt.Errorf("jobId %d (id %d) on %s : invalid requested job state: %#v", job.JobID, *job.ID, job.Cluster, req.State), http.StatusBadRequest, rw)
|
||||||
return
|
return
|
||||||
} else if req.State == "" {
|
} else if req.State == "" {
|
||||||
req.State = schema.JobStateCompleted
|
req.State = schema.JobStateCompleted
|
||||||
@@ -950,12 +950,12 @@ func (api *RestAPI) checkAndHandleStopJob(rw http.ResponseWriter, job *schema.Jo
|
|||||||
|
|
||||||
if err := api.JobRepository.Stop(*job.ID, job.Duration, job.State, job.MonitoringStatus); err != nil {
|
if err := api.JobRepository.Stop(*job.ID, job.Duration, job.State, job.MonitoringStatus); err != nil {
|
||||||
if err := api.JobRepository.StopCached(*job.ID, job.Duration, job.State, job.MonitoringStatus); err != nil {
|
if err := api.JobRepository.StopCached(*job.ID, job.Duration, job.State, job.MonitoringStatus); err != nil {
|
||||||
handleError(fmt.Errorf("jobId %d (id %d) on %s : marking job as '%s' (duration: %d) in DB failed: %w", job.JobID, job.ID, job.Cluster, job.State, job.Duration, err), http.StatusInternalServerError, rw)
|
handleError(fmt.Errorf("jobId %d (id %d) on %s : marking job as '%s' (duration: %d) in DB failed: %w", job.JobID, *job.ID, job.Cluster, job.State, job.Duration, err), http.StatusInternalServerError, rw)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
cclog.Infof("archiving job... (dbid: %d): cluster=%s, jobId=%d, user=%s, startTime=%d, duration=%d, state=%s", job.ID, job.Cluster, job.JobID, job.User, job.StartTime, job.Duration, job.State)
|
cclog.Infof("archiving job... (dbid: %d): cluster=%s, jobId=%d, user=%s, startTime=%d, duration=%d, state=%s", *job.ID, job.Cluster, job.JobID, job.User, job.StartTime, job.Duration, job.State)
|
||||||
|
|
||||||
// Send a response (with status OK). This means that errors that happen from here on forward
|
// Send a response (with status OK). This means that errors that happen from here on forward
|
||||||
// can *NOT* be communicated to the client. If reading from a MetricDataRepository or
|
// can *NOT* be communicated to the client. If reading from a MetricDataRepository or
|
||||||
|
|||||||
@@ -100,8 +100,8 @@ func (api *RestAPI) updateNodeStates(rw http.ResponseWriter, r *http.Request) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
cclog.Infof("Timer updateNodeStates, MemStore HealthCheck: %s", time.Since(startMs))
|
cclog.Debugf("Timer updateNodeStates, MemStore HealthCheck: %s", time.Since(startMs))
|
||||||
startDb := time.Now()
|
startDB := time.Now()
|
||||||
|
|
||||||
for _, node := range req.Nodes {
|
for _, node := range req.Nodes {
|
||||||
state := determineState(node.States)
|
state := determineState(node.States)
|
||||||
@@ -122,5 +122,5 @@ func (api *RestAPI) updateNodeStates(rw http.ResponseWriter, r *http.Request) {
|
|||||||
repo.UpdateNodeState(node.Hostname, req.Cluster, &nodeState)
|
repo.UpdateNodeState(node.Hostname, req.Cluster, &nodeState)
|
||||||
}
|
}
|
||||||
|
|
||||||
cclog.Infof("Timer updateNodeStates, SQLite Inserts: %s", time.Since(startDb))
|
cclog.Debugf("Timer updateNodeStates, SQLite Inserts: %s", time.Since(startDB))
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -126,7 +126,7 @@ func archivingWorker() {
|
|||||||
// not using meta data, called to load JobMeta into Cache?
|
// not using meta data, called to load JobMeta into Cache?
|
||||||
// will fail if job meta not in repository
|
// will fail if job meta not in repository
|
||||||
if _, err := jobRepo.FetchMetadata(job); err != nil {
|
if _, err := jobRepo.FetchMetadata(job); err != nil {
|
||||||
cclog.Errorf("archiving job (dbid: %d) failed at check metadata step: %s", job.ID, err.Error())
|
cclog.Errorf("archiving job (dbid: %d) failed at check metadata step: %s", *job.ID, err.Error())
|
||||||
jobRepo.UpdateMonitoringStatus(*job.ID, schema.MonitoringStatusArchivingFailed)
|
jobRepo.UpdateMonitoringStatus(*job.ID, schema.MonitoringStatusArchivingFailed)
|
||||||
archivePending.Done()
|
archivePending.Done()
|
||||||
continue
|
continue
|
||||||
@@ -136,7 +136,7 @@ func archivingWorker() {
|
|||||||
// Use shutdown context to allow cancellation
|
// Use shutdown context to allow cancellation
|
||||||
jobMeta, err := ArchiveJob(job, shutdownCtx)
|
jobMeta, err := ArchiveJob(job, shutdownCtx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
cclog.Errorf("archiving job (dbid: %d) failed at archiving job step: %s", job.ID, err.Error())
|
cclog.Errorf("archiving job (dbid: %d) failed at archiving job step: %s", *job.ID, err.Error())
|
||||||
jobRepo.UpdateMonitoringStatus(*job.ID, schema.MonitoringStatusArchivingFailed)
|
jobRepo.UpdateMonitoringStatus(*job.ID, schema.MonitoringStatusArchivingFailed)
|
||||||
archivePending.Done()
|
archivePending.Done()
|
||||||
continue
|
continue
|
||||||
@@ -145,24 +145,24 @@ func archivingWorker() {
|
|||||||
stmt := sq.Update("job").Where("job.id = ?", job.ID)
|
stmt := sq.Update("job").Where("job.id = ?", job.ID)
|
||||||
|
|
||||||
if stmt, err = jobRepo.UpdateFootprint(stmt, jobMeta); err != nil {
|
if stmt, err = jobRepo.UpdateFootprint(stmt, jobMeta); err != nil {
|
||||||
cclog.Errorf("archiving job (dbid: %d) failed at update Footprint step: %s", job.ID, err.Error())
|
cclog.Errorf("archiving job (dbid: %d) failed at update Footprint step: %s", *job.ID, err.Error())
|
||||||
archivePending.Done()
|
archivePending.Done()
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if stmt, err = jobRepo.UpdateEnergy(stmt, jobMeta); err != nil {
|
if stmt, err = jobRepo.UpdateEnergy(stmt, jobMeta); err != nil {
|
||||||
cclog.Errorf("archiving job (dbid: %d) failed at update Energy step: %s", job.ID, err.Error())
|
cclog.Errorf("archiving job (dbid: %d) failed at update Energy step: %s", *job.ID, err.Error())
|
||||||
archivePending.Done()
|
archivePending.Done()
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
// Update the jobs database entry one last time:
|
// Update the jobs database entry one last time:
|
||||||
stmt = jobRepo.MarkArchived(stmt, schema.MonitoringStatusArchivingSuccessful)
|
stmt = jobRepo.MarkArchived(stmt, schema.MonitoringStatusArchivingSuccessful)
|
||||||
if err := jobRepo.Execute(stmt); err != nil {
|
if err := jobRepo.Execute(stmt); err != nil {
|
||||||
cclog.Errorf("archiving job (dbid: %d) failed at db execute: %s", job.ID, err.Error())
|
cclog.Errorf("archiving job (dbid: %d) failed at db execute: %s", *job.ID, err.Error())
|
||||||
archivePending.Done()
|
archivePending.Done()
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
cclog.Debugf("archiving job %d took %s", job.JobID, time.Since(start))
|
cclog.Debugf("archiving job %d took %s", job.JobID, time.Since(start))
|
||||||
cclog.Infof("archiving job (dbid: %d) successful", job.ID)
|
cclog.Infof("archiving job (dbid: %d) successful", *job.ID)
|
||||||
|
|
||||||
repository.CallJobStopHooks(job)
|
repository.CallJobStopHooks(job)
|
||||||
archivePending.Done()
|
archivePending.Done()
|
||||||
|
|||||||
@@ -2712,7 +2712,7 @@ type TimeRangeOutput {
|
|||||||
input NodeFilter {
|
input NodeFilter {
|
||||||
hostname: StringInput
|
hostname: StringInput
|
||||||
cluster: StringInput
|
cluster: StringInput
|
||||||
subcluster: StringInput
|
subCluster: StringInput
|
||||||
schedulerState: SchedulerState
|
schedulerState: SchedulerState
|
||||||
healthState: MonitoringState
|
healthState: MonitoringState
|
||||||
timeStart: Int
|
timeStart: Int
|
||||||
@@ -2727,6 +2727,7 @@ input JobFilter {
|
|||||||
project: StringInput
|
project: StringInput
|
||||||
jobName: StringInput
|
jobName: StringInput
|
||||||
cluster: StringInput
|
cluster: StringInput
|
||||||
|
subCluster: StringInput
|
||||||
partition: StringInput
|
partition: StringInput
|
||||||
duration: IntRange
|
duration: IntRange
|
||||||
energy: FloatRange
|
energy: FloatRange
|
||||||
@@ -13199,7 +13200,7 @@ func (ec *executionContext) unmarshalInputJobFilter(ctx context.Context, obj any
|
|||||||
asMap[k] = v
|
asMap[k] = v
|
||||||
}
|
}
|
||||||
|
|
||||||
fieldsInOrder := [...]string{"tags", "dbId", "jobId", "arrayJobId", "user", "project", "jobName", "cluster", "partition", "duration", "energy", "minRunningFor", "numNodes", "numAccelerators", "numHWThreads", "startTime", "state", "metricStats", "shared", "schedule", "node"}
|
fieldsInOrder := [...]string{"tags", "dbId", "jobId", "arrayJobId", "user", "project", "jobName", "cluster", "subCluster", "partition", "duration", "energy", "minRunningFor", "numNodes", "numAccelerators", "numHWThreads", "startTime", "state", "metricStats", "shared", "schedule", "node"}
|
||||||
for _, k := range fieldsInOrder {
|
for _, k := range fieldsInOrder {
|
||||||
v, ok := asMap[k]
|
v, ok := asMap[k]
|
||||||
if !ok {
|
if !ok {
|
||||||
@@ -13262,6 +13263,13 @@ func (ec *executionContext) unmarshalInputJobFilter(ctx context.Context, obj any
|
|||||||
return it, err
|
return it, err
|
||||||
}
|
}
|
||||||
it.Cluster = data
|
it.Cluster = data
|
||||||
|
case "subCluster":
|
||||||
|
ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("subCluster"))
|
||||||
|
data, err := ec.unmarshalOStringInput2ᚖgithubᚗcomᚋClusterCockpitᚋccᚑbackendᚋinternalᚋgraphᚋmodelᚐStringInput(ctx, v)
|
||||||
|
if err != nil {
|
||||||
|
return it, err
|
||||||
|
}
|
||||||
|
it.SubCluster = data
|
||||||
case "partition":
|
case "partition":
|
||||||
ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("partition"))
|
ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("partition"))
|
||||||
data, err := ec.unmarshalOStringInput2ᚖgithubᚗcomᚋClusterCockpitᚋccᚑbackendᚋinternalᚋgraphᚋmodelᚐStringInput(ctx, v)
|
data, err := ec.unmarshalOStringInput2ᚖgithubᚗcomᚋClusterCockpitᚋccᚑbackendᚋinternalᚋgraphᚋmodelᚐStringInput(ctx, v)
|
||||||
@@ -13400,7 +13408,7 @@ func (ec *executionContext) unmarshalInputNodeFilter(ctx context.Context, obj an
|
|||||||
asMap[k] = v
|
asMap[k] = v
|
||||||
}
|
}
|
||||||
|
|
||||||
fieldsInOrder := [...]string{"hostname", "cluster", "subcluster", "schedulerState", "healthState", "timeStart"}
|
fieldsInOrder := [...]string{"hostname", "cluster", "subCluster", "schedulerState", "healthState", "timeStart"}
|
||||||
for _, k := range fieldsInOrder {
|
for _, k := range fieldsInOrder {
|
||||||
v, ok := asMap[k]
|
v, ok := asMap[k]
|
||||||
if !ok {
|
if !ok {
|
||||||
@@ -13421,13 +13429,13 @@ func (ec *executionContext) unmarshalInputNodeFilter(ctx context.Context, obj an
|
|||||||
return it, err
|
return it, err
|
||||||
}
|
}
|
||||||
it.Cluster = data
|
it.Cluster = data
|
||||||
case "subcluster":
|
case "subCluster":
|
||||||
ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("subcluster"))
|
ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("subCluster"))
|
||||||
data, err := ec.unmarshalOStringInput2ᚖgithubᚗcomᚋClusterCockpitᚋccᚑbackendᚋinternalᚋgraphᚋmodelᚐStringInput(ctx, v)
|
data, err := ec.unmarshalOStringInput2ᚖgithubᚗcomᚋClusterCockpitᚋccᚑbackendᚋinternalᚋgraphᚋmodelᚐStringInput(ctx, v)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return it, err
|
return it, err
|
||||||
}
|
}
|
||||||
it.Subcluster = data
|
it.SubCluster = data
|
||||||
case "schedulerState":
|
case "schedulerState":
|
||||||
ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("schedulerState"))
|
ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("schedulerState"))
|
||||||
data, err := ec.unmarshalOSchedulerState2ᚖgithubᚗcomᚋClusterCockpitᚋccᚑlibᚋv2ᚋschemaᚐSchedulerState(ctx, v)
|
data, err := ec.unmarshalOSchedulerState2ᚖgithubᚗcomᚋClusterCockpitᚋccᚑlibᚋv2ᚋschemaᚐSchedulerState(ctx, v)
|
||||||
|
|||||||
@@ -71,6 +71,7 @@ type JobFilter struct {
|
|||||||
Project *StringInput `json:"project,omitempty"`
|
Project *StringInput `json:"project,omitempty"`
|
||||||
JobName *StringInput `json:"jobName,omitempty"`
|
JobName *StringInput `json:"jobName,omitempty"`
|
||||||
Cluster *StringInput `json:"cluster,omitempty"`
|
Cluster *StringInput `json:"cluster,omitempty"`
|
||||||
|
SubCluster *StringInput `json:"subCluster,omitempty"`
|
||||||
Partition *StringInput `json:"partition,omitempty"`
|
Partition *StringInput `json:"partition,omitempty"`
|
||||||
Duration *config.IntRange `json:"duration,omitempty"`
|
Duration *config.IntRange `json:"duration,omitempty"`
|
||||||
Energy *FloatRange `json:"energy,omitempty"`
|
Energy *FloatRange `json:"energy,omitempty"`
|
||||||
@@ -186,7 +187,7 @@ type NamedStatsWithScope struct {
|
|||||||
type NodeFilter struct {
|
type NodeFilter struct {
|
||||||
Hostname *StringInput `json:"hostname,omitempty"`
|
Hostname *StringInput `json:"hostname,omitempty"`
|
||||||
Cluster *StringInput `json:"cluster,omitempty"`
|
Cluster *StringInput `json:"cluster,omitempty"`
|
||||||
Subcluster *StringInput `json:"subcluster,omitempty"`
|
SubCluster *StringInput `json:"subCluster,omitempty"`
|
||||||
SchedulerState *schema.SchedulerState `json:"schedulerState,omitempty"`
|
SchedulerState *schema.SchedulerState `json:"schedulerState,omitempty"`
|
||||||
HealthState *string `json:"healthState,omitempty"`
|
HealthState *string `json:"healthState,omitempty"`
|
||||||
TimeStart *int `json:"timeStart,omitempty"`
|
TimeStart *int `json:"timeStart,omitempty"`
|
||||||
|
|||||||
@@ -57,13 +57,13 @@ func (r *queryResolver) rooflineHeatmap(
|
|||||||
|
|
||||||
jobdata, err := metricdispatch.LoadData(job, []string{"flops_any", "mem_bw"}, []schema.MetricScope{schema.MetricScopeNode}, ctx, 0)
|
jobdata, err := metricdispatch.LoadData(job, []string{"flops_any", "mem_bw"}, []schema.MetricScope{schema.MetricScopeNode}, ctx, 0)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
cclog.Warnf("Error while loading roofline metrics for job %d", job.ID)
|
cclog.Warnf("Error while loading roofline metrics for job %d", *job.ID)
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
flops_, membw_ := jobdata["flops_any"], jobdata["mem_bw"]
|
flops_, membw_ := jobdata["flops_any"], jobdata["mem_bw"]
|
||||||
if flops_ == nil && membw_ == nil {
|
if flops_ == nil && membw_ == nil {
|
||||||
cclog.Warnf("rooflineHeatmap(): 'flops_any' or 'mem_bw' missing for job %d", job.ID)
|
cclog.Warnf("rooflineHeatmap(): 'flops_any' or 'mem_bw' missing for job %d", *job.ID)
|
||||||
continue
|
continue
|
||||||
// return nil, fmt.Errorf("GRAPH/UTIL > 'flops_any' or 'mem_bw' missing for job %d", job.ID)
|
// return nil, fmt.Errorf("GRAPH/UTIL > 'flops_any' or 'mem_bw' missing for job %d", job.ID)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -216,7 +216,7 @@ func enrichJobMetadata(job *schema.Job) error {
|
|||||||
metricEnergy = math.Round(rawEnergy*100.0) / 100.0
|
metricEnergy = math.Round(rawEnergy*100.0) / 100.0
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
cclog.Warnf("Error while collecting energy metric %s for job, DB ID '%v', return '0.0'", fp, job.ID)
|
cclog.Warnf("Error while collecting energy metric %s for job, DB ID '%v', return '0.0'", fp, *job.ID)
|
||||||
}
|
}
|
||||||
|
|
||||||
job.EnergyFootprint[fp] = metricEnergy
|
job.EnergyFootprint[fp] = metricEnergy
|
||||||
@@ -225,7 +225,7 @@ func enrichJobMetadata(job *schema.Job) error {
|
|||||||
|
|
||||||
job.Energy = (math.Round(totalEnergy*100.0) / 100.0)
|
job.Energy = (math.Round(totalEnergy*100.0) / 100.0)
|
||||||
if job.RawEnergyFootprint, err = json.Marshal(job.EnergyFootprint); err != nil {
|
if job.RawEnergyFootprint, err = json.Marshal(job.EnergyFootprint); err != nil {
|
||||||
cclog.Warnf("Error while marshaling energy footprint for job INTO BYTES, DB ID '%v'", job.ID)
|
cclog.Warnf("Error while marshaling energy footprint for job INTO BYTES, DB ID '%v'", *job.ID)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -64,7 +64,7 @@ func cacheKey(
|
|||||||
resolution int,
|
resolution int,
|
||||||
) string {
|
) string {
|
||||||
return fmt.Sprintf("%d(%s):[%v],[%v]-%d",
|
return fmt.Sprintf("%d(%s):[%v],[%v]-%d",
|
||||||
job.ID, job.State, metrics, scopes, resolution)
|
*job.ID, job.State, metrics, scopes, resolution)
|
||||||
}
|
}
|
||||||
|
|
||||||
// LoadData retrieves metric data for a job from the appropriate backend (memory store for running jobs,
|
// LoadData retrieves metric data for a job from the appropriate backend (memory store for running jobs,
|
||||||
|
|||||||
@@ -149,7 +149,7 @@ func (ccms *CCMetricStore) buildQueries(
|
|||||||
// Similar to buildQueries but uses full node topology instead of job-allocated resources.
|
// Similar to buildQueries but uses full node topology instead of job-allocated resources.
|
||||||
//
|
//
|
||||||
// The function handles:
|
// The function handles:
|
||||||
// - Subcluster topology resolution (either pre-loaded or per-node lookup)
|
// - SubCluster topology resolution (either pre-loaded or per-node lookup)
|
||||||
// - Full node hardware thread lists (not job-specific subsets)
|
// - Full node hardware thread lists (not job-specific subsets)
|
||||||
// - All accelerators on each node
|
// - All accelerators on each node
|
||||||
// - Metric configuration validation with subcluster filtering
|
// - Metric configuration validation with subcluster filtering
|
||||||
|
|||||||
@@ -229,7 +229,7 @@ func (r *JobRepository) FetchMetadata(job *schema.Job) (map[string]string, error
|
|||||||
}
|
}
|
||||||
|
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
cachekey := fmt.Sprintf("metadata:%d", job.ID)
|
cachekey := fmt.Sprintf("metadata:%d", *job.ID)
|
||||||
if cached := r.cache.Get(cachekey, nil); cached != nil {
|
if cached := r.cache.Get(cachekey, nil); cached != nil {
|
||||||
job.MetaData = cached.(map[string]string)
|
job.MetaData = cached.(map[string]string)
|
||||||
return job.MetaData, nil
|
return job.MetaData, nil
|
||||||
@@ -237,8 +237,8 @@ func (r *JobRepository) FetchMetadata(job *schema.Job) (map[string]string, error
|
|||||||
|
|
||||||
if err := sq.Select("job.meta_data").From("job").Where("job.id = ?", job.ID).
|
if err := sq.Select("job.meta_data").From("job").Where("job.id = ?", job.ID).
|
||||||
RunWith(r.stmtCache).QueryRow().Scan(&job.RawMetaData); err != nil {
|
RunWith(r.stmtCache).QueryRow().Scan(&job.RawMetaData); err != nil {
|
||||||
cclog.Warnf("Error while scanning for job metadata (ID=%d): %v", job.ID, err)
|
cclog.Warnf("Error while scanning for job metadata (ID=%d): %v", *job.ID, err)
|
||||||
return nil, fmt.Errorf("failed to fetch metadata for job %d: %w", job.ID, err)
|
return nil, fmt.Errorf("failed to fetch metadata for job %d: %w", *job.ID, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(job.RawMetaData) == 0 {
|
if len(job.RawMetaData) == 0 {
|
||||||
@@ -246,8 +246,8 @@ func (r *JobRepository) FetchMetadata(job *schema.Job) (map[string]string, error
|
|||||||
}
|
}
|
||||||
|
|
||||||
if err := json.Unmarshal(job.RawMetaData, &job.MetaData); err != nil {
|
if err := json.Unmarshal(job.RawMetaData, &job.MetaData); err != nil {
|
||||||
cclog.Warnf("Error while unmarshaling raw metadata json (ID=%d): %v", job.ID, err)
|
cclog.Warnf("Error while unmarshaling raw metadata json (ID=%d): %v", *job.ID, err)
|
||||||
return nil, fmt.Errorf("failed to unmarshal metadata for job %d: %w", job.ID, err)
|
return nil, fmt.Errorf("failed to unmarshal metadata for job %d: %w", *job.ID, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
r.cache.Put(cachekey, job.MetaData, len(job.RawMetaData), 24*time.Hour)
|
r.cache.Put(cachekey, job.MetaData, len(job.RawMetaData), 24*time.Hour)
|
||||||
@@ -270,12 +270,12 @@ func (r *JobRepository) UpdateMetadata(job *schema.Job, key, val string) (err er
|
|||||||
return fmt.Errorf("job cannot be nil")
|
return fmt.Errorf("job cannot be nil")
|
||||||
}
|
}
|
||||||
|
|
||||||
cachekey := fmt.Sprintf("metadata:%d", job.ID)
|
cachekey := fmt.Sprintf("metadata:%d", *job.ID)
|
||||||
r.cache.Del(cachekey)
|
r.cache.Del(cachekey)
|
||||||
if job.MetaData == nil {
|
if job.MetaData == nil {
|
||||||
if _, err = r.FetchMetadata(job); err != nil {
|
if _, err = r.FetchMetadata(job); err != nil {
|
||||||
cclog.Warnf("Error while fetching metadata for job, DB ID '%v'", job.ID)
|
cclog.Warnf("Error while fetching metadata for job, DB ID '%v'", *job.ID)
|
||||||
return fmt.Errorf("failed to fetch metadata for job %d: %w", job.ID, err)
|
return fmt.Errorf("failed to fetch metadata for job %d: %w", *job.ID, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -289,16 +289,16 @@ func (r *JobRepository) UpdateMetadata(job *schema.Job, key, val string) (err er
|
|||||||
}
|
}
|
||||||
|
|
||||||
if job.RawMetaData, err = json.Marshal(job.MetaData); err != nil {
|
if job.RawMetaData, err = json.Marshal(job.MetaData); err != nil {
|
||||||
cclog.Warnf("Error while marshaling metadata for job, DB ID '%v'", job.ID)
|
cclog.Warnf("Error while marshaling metadata for job, DB ID '%v'", *job.ID)
|
||||||
return fmt.Errorf("failed to marshal metadata for job %d: %w", job.ID, err)
|
return fmt.Errorf("failed to marshal metadata for job %d: %w", *job.ID, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if _, err = sq.Update("job").
|
if _, err = sq.Update("job").
|
||||||
Set("meta_data", job.RawMetaData).
|
Set("meta_data", job.RawMetaData).
|
||||||
Where("job.id = ?", job.ID).
|
Where("job.id = ?", job.ID).
|
||||||
RunWith(r.stmtCache).Exec(); err != nil {
|
RunWith(r.stmtCache).Exec(); err != nil {
|
||||||
cclog.Warnf("Error while updating metadata for job, DB ID '%v'", job.ID)
|
cclog.Warnf("Error while updating metadata for job, DB ID '%v'", *job.ID)
|
||||||
return fmt.Errorf("failed to update metadata in database for job %d: %w", job.ID, err)
|
return fmt.Errorf("failed to update metadata in database for job %d: %w", *job.ID, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
r.cache.Put(cachekey, job.MetaData, len(job.RawMetaData), 24*time.Hour)
|
r.cache.Put(cachekey, job.MetaData, len(job.RawMetaData), 24*time.Hour)
|
||||||
@@ -324,8 +324,8 @@ func (r *JobRepository) FetchFootprint(job *schema.Job) (map[string]float64, err
|
|||||||
|
|
||||||
if err := sq.Select("job.footprint").From("job").Where("job.id = ?", job.ID).
|
if err := sq.Select("job.footprint").From("job").Where("job.id = ?", job.ID).
|
||||||
RunWith(r.stmtCache).QueryRow().Scan(&job.RawFootprint); err != nil {
|
RunWith(r.stmtCache).QueryRow().Scan(&job.RawFootprint); err != nil {
|
||||||
cclog.Warnf("Error while scanning for job footprint (ID=%d): %v", job.ID, err)
|
cclog.Warnf("Error while scanning for job footprint (ID=%d): %v", *job.ID, err)
|
||||||
return nil, fmt.Errorf("failed to fetch footprint for job %d: %w", job.ID, err)
|
return nil, fmt.Errorf("failed to fetch footprint for job %d: %w", *job.ID, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(job.RawFootprint) == 0 {
|
if len(job.RawFootprint) == 0 {
|
||||||
@@ -333,8 +333,8 @@ func (r *JobRepository) FetchFootprint(job *schema.Job) (map[string]float64, err
|
|||||||
}
|
}
|
||||||
|
|
||||||
if err := json.Unmarshal(job.RawFootprint, &job.Footprint); err != nil {
|
if err := json.Unmarshal(job.RawFootprint, &job.Footprint); err != nil {
|
||||||
cclog.Warnf("Error while unmarshaling raw footprint json (ID=%d): %v", job.ID, err)
|
cclog.Warnf("Error while unmarshaling raw footprint json (ID=%d): %v", *job.ID, err)
|
||||||
return nil, fmt.Errorf("failed to unmarshal footprint for job %d: %w", job.ID, err)
|
return nil, fmt.Errorf("failed to unmarshal footprint for job %d: %w", *job.ID, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
cclog.Debugf("Timer FetchFootprint %s", time.Since(start))
|
cclog.Debugf("Timer FetchFootprint %s", time.Since(start))
|
||||||
@@ -357,7 +357,7 @@ func (r *JobRepository) FetchEnergyFootprint(job *schema.Job) (map[string]float6
|
|||||||
}
|
}
|
||||||
|
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
cachekey := fmt.Sprintf("energyFootprint:%d", job.ID)
|
cachekey := fmt.Sprintf("energyFootprint:%d", *job.ID)
|
||||||
if cached := r.cache.Get(cachekey, nil); cached != nil {
|
if cached := r.cache.Get(cachekey, nil); cached != nil {
|
||||||
job.EnergyFootprint = cached.(map[string]float64)
|
job.EnergyFootprint = cached.(map[string]float64)
|
||||||
return job.EnergyFootprint, nil
|
return job.EnergyFootprint, nil
|
||||||
@@ -365,8 +365,8 @@ func (r *JobRepository) FetchEnergyFootprint(job *schema.Job) (map[string]float6
|
|||||||
|
|
||||||
if err := sq.Select("job.energy_footprint").From("job").Where("job.id = ?", job.ID).
|
if err := sq.Select("job.energy_footprint").From("job").Where("job.id = ?", job.ID).
|
||||||
RunWith(r.stmtCache).QueryRow().Scan(&job.RawEnergyFootprint); err != nil {
|
RunWith(r.stmtCache).QueryRow().Scan(&job.RawEnergyFootprint); err != nil {
|
||||||
cclog.Warnf("Error while scanning for job energy_footprint (ID=%d): %v", job.ID, err)
|
cclog.Warnf("Error while scanning for job energy_footprint (ID=%d): %v", *job.ID, err)
|
||||||
return nil, fmt.Errorf("failed to fetch energy footprint for job %d: %w", job.ID, err)
|
return nil, fmt.Errorf("failed to fetch energy footprint for job %d: %w", *job.ID, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(job.RawEnergyFootprint) == 0 {
|
if len(job.RawEnergyFootprint) == 0 {
|
||||||
@@ -374,8 +374,8 @@ func (r *JobRepository) FetchEnergyFootprint(job *schema.Job) (map[string]float6
|
|||||||
}
|
}
|
||||||
|
|
||||||
if err := json.Unmarshal(job.RawEnergyFootprint, &job.EnergyFootprint); err != nil {
|
if err := json.Unmarshal(job.RawEnergyFootprint, &job.EnergyFootprint); err != nil {
|
||||||
cclog.Warnf("Error while unmarshaling raw energy footprint json (ID=%d): %v", job.ID, err)
|
cclog.Warnf("Error while unmarshaling raw energy footprint json (ID=%d): %v", *job.ID, err)
|
||||||
return nil, fmt.Errorf("failed to unmarshal energy footprint for job %d: %w", job.ID, err)
|
return nil, fmt.Errorf("failed to unmarshal energy footprint for job %d: %w", *job.ID, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
r.cache.Put(cachekey, job.EnergyFootprint, len(job.EnergyFootprint), 24*time.Hour)
|
r.cache.Put(cachekey, job.EnergyFootprint, len(job.EnergyFootprint), 24*time.Hour)
|
||||||
|
|||||||
@@ -190,6 +190,9 @@ func BuildWhereClause(filter *model.JobFilter, query sq.SelectBuilder) sq.Select
|
|||||||
if filter.Cluster != nil {
|
if filter.Cluster != nil {
|
||||||
query = buildStringCondition("job.cluster", filter.Cluster, query)
|
query = buildStringCondition("job.cluster", filter.Cluster, query)
|
||||||
}
|
}
|
||||||
|
if filter.SubCluster != nil {
|
||||||
|
query = buildStringCondition("job.subcluster", filter.SubCluster, query)
|
||||||
|
}
|
||||||
if filter.Partition != nil {
|
if filter.Partition != nil {
|
||||||
query = buildStringCondition("job.cluster_partition", filter.Partition, query)
|
query = buildStringCondition("job.cluster_partition", filter.Partition, query)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -254,8 +254,8 @@ func (r *NodeRepository) QueryNodes(
|
|||||||
if f.Cluster != nil {
|
if f.Cluster != nil {
|
||||||
query = buildStringCondition("cluster", f.Cluster, query)
|
query = buildStringCondition("cluster", f.Cluster, query)
|
||||||
}
|
}
|
||||||
if f.Subcluster != nil {
|
if f.SubCluster != nil {
|
||||||
query = buildStringCondition("subcluster", f.Subcluster, query)
|
query = buildStringCondition("subcluster", f.SubCluster, query)
|
||||||
}
|
}
|
||||||
if f.Hostname != nil {
|
if f.Hostname != nil {
|
||||||
query = buildStringCondition("hostname", f.Hostname, query)
|
query = buildStringCondition("hostname", f.Hostname, query)
|
||||||
@@ -322,8 +322,8 @@ func (r *NodeRepository) CountNodes(
|
|||||||
if f.Cluster != nil {
|
if f.Cluster != nil {
|
||||||
query = buildStringCondition("cluster", f.Cluster, query)
|
query = buildStringCondition("cluster", f.Cluster, query)
|
||||||
}
|
}
|
||||||
if f.Subcluster != nil {
|
if f.SubCluster != nil {
|
||||||
query = buildStringCondition("subcluster", f.Subcluster, query)
|
query = buildStringCondition("subcluster", f.SubCluster, query)
|
||||||
}
|
}
|
||||||
if f.Hostname != nil {
|
if f.Hostname != nil {
|
||||||
query = buildStringCondition("hostname", f.Hostname, query)
|
query = buildStringCondition("hostname", f.Hostname, query)
|
||||||
@@ -440,8 +440,8 @@ func (r *NodeRepository) CountStates(ctx context.Context, filters []*model.NodeF
|
|||||||
if f.Cluster != nil {
|
if f.Cluster != nil {
|
||||||
query = buildStringCondition("cluster", f.Cluster, query)
|
query = buildStringCondition("cluster", f.Cluster, query)
|
||||||
}
|
}
|
||||||
if f.Subcluster != nil {
|
if f.SubCluster != nil {
|
||||||
query = buildStringCondition("subcluster", f.Subcluster, query)
|
query = buildStringCondition("subcluster", f.SubCluster, query)
|
||||||
}
|
}
|
||||||
if f.SchedulerState != nil {
|
if f.SchedulerState != nil {
|
||||||
query = query.Where("node_state = ?", f.SchedulerState)
|
query = query.Where("node_state = ?", f.SchedulerState)
|
||||||
@@ -504,8 +504,8 @@ func (r *NodeRepository) CountStatesTimed(ctx context.Context, filters []*model.
|
|||||||
if f.Cluster != nil {
|
if f.Cluster != nil {
|
||||||
query = buildStringCondition("cluster", f.Cluster, query)
|
query = buildStringCondition("cluster", f.Cluster, query)
|
||||||
}
|
}
|
||||||
if f.Subcluster != nil {
|
if f.SubCluster != nil {
|
||||||
query = buildStringCondition("subcluster", f.Subcluster, query)
|
query = buildStringCondition("subcluster", f.SubCluster, query)
|
||||||
}
|
}
|
||||||
if f.SchedulerState != nil {
|
if f.SchedulerState != nil {
|
||||||
query = query.Where("node_state = ?", f.SchedulerState)
|
query = query.Where("node_state = ?", f.SchedulerState)
|
||||||
@@ -573,7 +573,7 @@ func (r *NodeRepository) GetNodesForList(
|
|||||||
queryFilters = append(queryFilters, &model.NodeFilter{Cluster: &model.StringInput{Eq: &cluster}})
|
queryFilters = append(queryFilters, &model.NodeFilter{Cluster: &model.StringInput{Eq: &cluster}})
|
||||||
}
|
}
|
||||||
if subCluster != "" {
|
if subCluster != "" {
|
||||||
queryFilters = append(queryFilters, &model.NodeFilter{Subcluster: &model.StringInput{Eq: &subCluster}})
|
queryFilters = append(queryFilters, &model.NodeFilter{SubCluster: &model.StringInput{Eq: &subCluster}})
|
||||||
}
|
}
|
||||||
if nodeFilter != "" && stateFilter != "notindb" {
|
if nodeFilter != "" && stateFilter != "notindb" {
|
||||||
queryFilters = append(queryFilters, &model.NodeFilter{Hostname: &model.StringInput{Contains: &nodeFilter}})
|
queryFilters = append(queryFilters, &model.NodeFilter{Hostname: &model.StringInput{Contains: &nodeFilter}})
|
||||||
|
|||||||
@@ -196,7 +196,7 @@ func (r *JobRepository) buildStatsQuery(
|
|||||||
// - filter: Filters to apply (time range, cluster, job state, etc.)
|
// - filter: Filters to apply (time range, cluster, job state, etc.)
|
||||||
// - page: Optional pagination (ItemsPerPage: -1 disables pagination)
|
// - page: Optional pagination (ItemsPerPage: -1 disables pagination)
|
||||||
// - sortBy: Optional sort column (totalJobs, totalWalltime, totalCoreHours, etc.)
|
// - sortBy: Optional sort column (totalJobs, totalWalltime, totalCoreHours, etc.)
|
||||||
// - groupBy: Required grouping dimension (User, Project, Cluster, or Subcluster)
|
// - groupBy: Required grouping dimension (User, Project, Cluster, or SubCluster)
|
||||||
//
|
//
|
||||||
// Returns a slice of JobsStatistics, one per group, with:
|
// Returns a slice of JobsStatistics, one per group, with:
|
||||||
// - ID: The group identifier (username, project name, cluster name, etc.)
|
// - ID: The group identifier (username, project name, cluster name, etc.)
|
||||||
@@ -420,7 +420,7 @@ func LoadJobStat(job *schema.Job, metric string, statType string) float64 {
|
|||||||
// Parameters:
|
// Parameters:
|
||||||
// - ctx: Context for security checks
|
// - ctx: Context for security checks
|
||||||
// - filter: Filters to apply
|
// - filter: Filters to apply
|
||||||
// - groupBy: Grouping dimension (User, Project, Cluster, or Subcluster)
|
// - groupBy: Grouping dimension (User, Project, Cluster, or SubCluster)
|
||||||
//
|
//
|
||||||
// Returns JobsStatistics with only ID and TotalJobs populated for each group.
|
// Returns JobsStatistics with only ID and TotalJobs populated for each group.
|
||||||
func (r *JobRepository) JobCountGrouped(
|
func (r *JobRepository) JobCountGrouped(
|
||||||
|
|||||||
@@ -455,6 +455,8 @@ func (r *JobRepository) AddTagOrCreateDirect(jobID int64, tagType string, tagNam
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
cclog.Infof("Adding tag %s:%s:%s (direct)", tagType, tagName, tagScope)
|
||||||
|
|
||||||
if _, err := r.AddTagDirect(jobID, tagID); err != nil {
|
if _, err := r.AddTagDirect(jobID, tagID); err != nil {
|
||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -30,7 +30,7 @@ const (
|
|||||||
|
|
||||||
type appInfo struct {
|
type appInfo struct {
|
||||||
tag string
|
tag string
|
||||||
strings []string
|
patterns []*regexp.Regexp
|
||||||
}
|
}
|
||||||
|
|
||||||
// AppTagger detects applications by matching patterns in job scripts.
|
// AppTagger detects applications by matching patterns in job scripts.
|
||||||
@@ -38,8 +38,8 @@ type appInfo struct {
|
|||||||
// configuration when files change. When a job script matches a pattern,
|
// configuration when files change. When a job script matches a pattern,
|
||||||
// the corresponding application tag is automatically applied.
|
// the corresponding application tag is automatically applied.
|
||||||
type AppTagger struct {
|
type AppTagger struct {
|
||||||
// apps maps application tags to their matching patterns
|
// apps holds application patterns in deterministic order
|
||||||
apps map[string]appInfo
|
apps []appInfo
|
||||||
// tagType is the type of tag ("app")
|
// tagType is the type of tag ("app")
|
||||||
tagType string
|
tagType string
|
||||||
// cfgPath is the path to watch for configuration changes
|
// cfgPath is the path to watch for configuration changes
|
||||||
@@ -48,13 +48,27 @@ type AppTagger struct {
|
|||||||
|
|
||||||
func (t *AppTagger) scanApp(f *os.File, fns string) {
|
func (t *AppTagger) scanApp(f *os.File, fns string) {
|
||||||
scanner := bufio.NewScanner(f)
|
scanner := bufio.NewScanner(f)
|
||||||
ai := appInfo{tag: strings.TrimSuffix(fns, filepath.Ext(fns)), strings: make([]string, 0)}
|
tag := strings.TrimSuffix(fns, filepath.Ext(fns))
|
||||||
|
ai := appInfo{tag: tag, patterns: make([]*regexp.Regexp, 0)}
|
||||||
|
|
||||||
for scanner.Scan() {
|
for scanner.Scan() {
|
||||||
ai.strings = append(ai.strings, scanner.Text())
|
line := scanner.Text()
|
||||||
|
re, err := regexp.Compile(line)
|
||||||
|
if err != nil {
|
||||||
|
cclog.Errorf("invalid regex pattern '%s' in %s: %v", line, fns, err)
|
||||||
|
continue
|
||||||
}
|
}
|
||||||
delete(t.apps, ai.tag)
|
ai.patterns = append(ai.patterns, re)
|
||||||
t.apps[ai.tag] = ai
|
}
|
||||||
|
|
||||||
|
// Remove existing entry for this tag if present
|
||||||
|
for i, a := range t.apps {
|
||||||
|
if a.tag == tag {
|
||||||
|
t.apps = append(t.apps[:i], t.apps[i+1:]...)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
t.apps = append(t.apps, ai)
|
||||||
}
|
}
|
||||||
|
|
||||||
// EventMatch checks if a filesystem event should trigger configuration reload.
|
// EventMatch checks if a filesystem event should trigger configuration reload.
|
||||||
@@ -65,7 +79,6 @@ func (t *AppTagger) EventMatch(s string) bool {
|
|||||||
|
|
||||||
// EventCallback is triggered when the configuration directory changes.
|
// EventCallback is triggered when the configuration directory changes.
|
||||||
// It reloads all application pattern files from the watched directory.
|
// It reloads all application pattern files from the watched directory.
|
||||||
// FIXME: Only process the file that caused the event
|
|
||||||
func (t *AppTagger) EventCallback() {
|
func (t *AppTagger) EventCallback() {
|
||||||
files, err := os.ReadDir(t.cfgPath)
|
files, err := os.ReadDir(t.cfgPath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -81,7 +94,9 @@ func (t *AppTagger) EventCallback() {
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
t.scanApp(f, fns)
|
t.scanApp(f, fns)
|
||||||
f.Close()
|
if err := f.Close(); err != nil {
|
||||||
|
cclog.Errorf("error closing app file %s: %#v", fns, err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -94,7 +109,7 @@ func (t *AppTagger) Register() error {
|
|||||||
t.cfgPath = defaultConfigPath
|
t.cfgPath = defaultConfigPath
|
||||||
}
|
}
|
||||||
t.tagType = tagTypeApp
|
t.tagType = tagTypeApp
|
||||||
t.apps = make(map[string]appInfo, 0)
|
t.apps = make([]appInfo, 0)
|
||||||
|
|
||||||
if !util.CheckFileExists(t.cfgPath) {
|
if !util.CheckFileExists(t.cfgPath) {
|
||||||
return fmt.Errorf("configuration path does not exist: %s", t.cfgPath)
|
return fmt.Errorf("configuration path does not exist: %s", t.cfgPath)
|
||||||
@@ -114,7 +129,9 @@ func (t *AppTagger) Register() error {
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
t.scanApp(f, fns)
|
t.scanApp(f, fns)
|
||||||
f.Close()
|
if err := f.Close(); err != nil {
|
||||||
|
cclog.Errorf("error closing app file %s: %#v", fns, err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
cclog.Infof("Setup file watch for %s", t.cfgPath)
|
cclog.Infof("Setup file watch for %s", t.cfgPath)
|
||||||
@@ -139,15 +156,14 @@ func (t *AppTagger) Match(job *schema.Job) {
|
|||||||
jobscript, ok := metadata["jobScript"]
|
jobscript, ok := metadata["jobScript"]
|
||||||
if ok {
|
if ok {
|
||||||
id := *job.ID
|
id := *job.ID
|
||||||
|
jobscriptLower := strings.ToLower(jobscript)
|
||||||
|
|
||||||
out:
|
out:
|
||||||
for _, a := range t.apps {
|
for _, a := range t.apps {
|
||||||
tag := a.tag
|
for _, re := range a.patterns {
|
||||||
for _, s := range a.strings {
|
if re.MatchString(jobscriptLower) {
|
||||||
matched, _ := regexp.MatchString(s, strings.ToLower(jobscript))
|
if !r.HasTag(id, t.tagType, a.tag) {
|
||||||
if matched {
|
r.AddTagOrCreateDirect(id, t.tagType, a.tag)
|
||||||
if !r.HasTag(id, t.tagType, tag) {
|
|
||||||
r.AddTagOrCreateDirect(id, t.tagType, tag)
|
|
||||||
break out
|
break out
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -66,7 +66,7 @@ func TestRegister(t *testing.T) {
|
|||||||
var tagger AppTagger
|
var tagger AppTagger
|
||||||
tagger.cfgPath = appsDir
|
tagger.cfgPath = appsDir
|
||||||
tagger.tagType = tagTypeApp
|
tagger.tagType = tagTypeApp
|
||||||
tagger.apps = make(map[string]appInfo, 0)
|
tagger.apps = make([]appInfo, 0)
|
||||||
|
|
||||||
files, err := os.ReadDir(appsDir)
|
files, err := os.ReadDir(appsDir)
|
||||||
noErr(t, err)
|
noErr(t, err)
|
||||||
@@ -97,7 +97,7 @@ func TestMatch(t *testing.T) {
|
|||||||
var tagger AppTagger
|
var tagger AppTagger
|
||||||
tagger.cfgPath = appsDir
|
tagger.cfgPath = appsDir
|
||||||
tagger.tagType = tagTypeApp
|
tagger.tagType = tagTypeApp
|
||||||
tagger.apps = make(map[string]appInfo, 0)
|
tagger.apps = make([]appInfo, 0)
|
||||||
|
|
||||||
files, err := os.ReadDir(appsDir)
|
files, err := os.ReadDir(appsDir)
|
||||||
noErr(t, err)
|
noErr(t, err)
|
||||||
|
|||||||
@@ -107,7 +107,7 @@ func RunTaggers() error {
|
|||||||
tagger.Match(job)
|
tagger.Match(job)
|
||||||
}
|
}
|
||||||
for _, tagger := range jobTagger.stopTaggers {
|
for _, tagger := range jobTagger.stopTaggers {
|
||||||
cclog.Infof("Run stop tagger for job %d", job.ID)
|
cclog.Infof("Run stop tagger for job %d", *job.ID)
|
||||||
tagger.Match(job)
|
tagger.Match(job)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -49,7 +49,7 @@ func RegisterFootprintWorker() {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
// NOTE: Additional Subcluster Loop Could Allow For Limited List Of Footprint-Metrics Only.
|
// NOTE: Additional SubCluster Loop Could Allow For Limited List Of Footprint-Metrics Only.
|
||||||
// - Chunk-Size Would Then Be 'SubCluster' (Running Jobs, Transactions) as Lists Can Change Within SCs
|
// - Chunk-Size Would Then Be 'SubCluster' (Running Jobs, Transactions) as Lists Can Change Within SCs
|
||||||
// - Would Require Review of 'updateFootprint' Usage (Logic Could Possibly Be Included Here Completely)
|
// - Would Require Review of 'updateFootprint' Usage (Logic Could Possibly Be Included Here Completely)
|
||||||
allMetrics := make([]string, 0)
|
allMetrics := make([]string, 0)
|
||||||
@@ -113,7 +113,7 @@ func RegisterFootprintWorker() {
|
|||||||
stmt := sq.Update("job")
|
stmt := sq.Update("job")
|
||||||
stmt, err = jobRepo.UpdateFootprint(stmt, job)
|
stmt, err = jobRepo.UpdateFootprint(stmt, job)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
cclog.Errorf("update job (dbid: %d) statement build failed at footprint step: %s", job.ID, err.Error())
|
cclog.Errorf("update job (dbid: %d) statement build failed at footprint step: %s", *job.ID, err.Error())
|
||||||
ce++
|
ce++
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -59,7 +59,7 @@
|
|||||||
const canvasPrefix = $derived(`${presetCluster}-${presetSubCluster ? presetSubCluster : ''}`)
|
const canvasPrefix = $derived(`${presetCluster}-${presetSubCluster ? presetSubCluster : ''}`)
|
||||||
|
|
||||||
const statusFilter = $derived(presetSubCluster
|
const statusFilter = $derived(presetSubCluster
|
||||||
? [{ state: ["running"] }, { cluster: { eq: presetCluster} }, { partition: { eq: presetSubCluster } }]
|
? [{ state: ["running"] }, { cluster: { eq: presetCluster} }, { subCluster: { eq: presetSubCluster } }]
|
||||||
: [{ state: ["running"] }, { cluster: { eq: presetCluster} }]
|
: [{ state: ["running"] }, { cluster: { eq: presetCluster} }]
|
||||||
);
|
);
|
||||||
const topJobsQuery = $derived(queryStore({
|
const topJobsQuery = $derived(queryStore({
|
||||||
|
|||||||
Reference in New Issue
Block a user