mirror of
https://github.com/ClusterCockpit/cc-backend
synced 2026-02-28 21:37:31 +01:00
Merge branch 'dev' into optimize-checkpoint-loading
This commit is contained in:
2
Makefile
2
Makefile
@@ -84,4 +84,4 @@ $(VAR):
|
|||||||
|
|
||||||
$(SVELTE_TARGETS): $(SVELTE_SRC)
|
$(SVELTE_TARGETS): $(SVELTE_SRC)
|
||||||
$(info ===> BUILD frontend)
|
$(info ===> BUILD frontend)
|
||||||
cd web/frontend && npm install && npm run build
|
cd web/frontend && npm ci && npm run build
|
||||||
|
|||||||
@@ -11,15 +11,11 @@
|
|||||||
"job.duration > job_min_duration_seconds"
|
"job.duration > job_min_duration_seconds"
|
||||||
],
|
],
|
||||||
"variables": [
|
"variables": [
|
||||||
{
|
|
||||||
"name": "memory_threshold",
|
|
||||||
"expr": "mem_used.limits.peak * highmemoryusage_threshold_factor"
|
|
||||||
},
|
|
||||||
{
|
{
|
||||||
"name": "memory_usage_pct",
|
"name": "memory_usage_pct",
|
||||||
"expr": "mem_used.max / mem_used.limits.peak * 100.0"
|
"expr": "mem_used.max / mem_used.limits.peak * 100.0"
|
||||||
}
|
}
|
||||||
],
|
],
|
||||||
"rule": "mem_used.max > memory_threshold",
|
"rule": "mem_used.max > memory_used.limits.alert",
|
||||||
"hint": "This job used high memory: peak memory usage {{.mem_used.max}} GB ({{.memory_usage_pct}}% of {{.mem_used.limits.peak}} GB node capacity), exceeding the {{.highmemoryusage_threshold_factor}} utilization threshold. Risk of out-of-memory conditions."
|
"hint": "This job used high memory: peak memory usage {{.mem_used.max}} GB ({{.memory_usage_pct}}% of {{.mem_used.limits.peak}} GB node capacity), exceeding the {{.highmemoryusage_threshold_factor}} utilization threshold. Risk of out-of-memory conditions."
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,10 +1,7 @@
|
|||||||
{
|
{
|
||||||
"name": "Low CPU load",
|
"name": "Low CPU load",
|
||||||
"tag": "lowload",
|
"tag": "lowload",
|
||||||
"parameters": [
|
"parameters": ["lowcpuload_threshold_factor", "job_min_duration_seconds"],
|
||||||
"lowcpuload_threshold_factor",
|
|
||||||
"job_min_duration_seconds"
|
|
||||||
],
|
|
||||||
"metrics": ["cpu_load"],
|
"metrics": ["cpu_load"],
|
||||||
"requirements": [
|
"requirements": [
|
||||||
"job.shared == \"none\"",
|
"job.shared == \"none\"",
|
||||||
@@ -13,9 +10,9 @@
|
|||||||
"variables": [
|
"variables": [
|
||||||
{
|
{
|
||||||
"name": "load_threshold",
|
"name": "load_threshold",
|
||||||
"expr": "job.numCores * lowcpuload_threshold_factor"
|
"expr": "cpu_load.limits.peak * lowcpuload_threshold_factor"
|
||||||
}
|
}
|
||||||
],
|
],
|
||||||
"rule": "cpu_load.avg < load_threshold",
|
"rule": "cpu_load.avg < load_threshold",
|
||||||
"hint": "This job was detected as low CPU load: average cpu load {{.cpu_load.avg}} is below the threshold {{.load_threshold}} ({{.lowcpuload_threshold_factor}} \u00d7 {{.job.numCores}} allocated cores)."
|
"hint": "This job was detected as low CPU load: average cpu load {{.cpu_load.avg}} is below the threshold {{.load_threshold}} ({{.lowcpuload_threshold_factor}})."
|
||||||
}
|
}
|
||||||
|
|||||||
2
go.mod
2
go.mod
@@ -9,7 +9,7 @@ tool (
|
|||||||
|
|
||||||
require (
|
require (
|
||||||
github.com/99designs/gqlgen v0.17.86
|
github.com/99designs/gqlgen v0.17.86
|
||||||
github.com/ClusterCockpit/cc-lib/v2 v2.5.1
|
github.com/ClusterCockpit/cc-lib/v2 v2.6.0
|
||||||
github.com/Masterminds/squirrel v1.5.4
|
github.com/Masterminds/squirrel v1.5.4
|
||||||
github.com/aws/aws-sdk-go-v2 v1.41.1
|
github.com/aws/aws-sdk-go-v2 v1.41.1
|
||||||
github.com/aws/aws-sdk-go-v2/config v1.32.8
|
github.com/aws/aws-sdk-go-v2/config v1.32.8
|
||||||
|
|||||||
2
go.sum
2
go.sum
@@ -6,6 +6,8 @@ github.com/Azure/go-ntlmssp v0.1.0 h1:DjFo6YtWzNqNvQdrwEyr/e4nhU3vRiwenz5QX7sFz+
|
|||||||
github.com/Azure/go-ntlmssp v0.1.0/go.mod h1:NYqdhxd/8aAct/s4qSYZEerdPuH1liG2/X9DiVTbhpk=
|
github.com/Azure/go-ntlmssp v0.1.0/go.mod h1:NYqdhxd/8aAct/s4qSYZEerdPuH1liG2/X9DiVTbhpk=
|
||||||
github.com/ClusterCockpit/cc-lib/v2 v2.5.1 h1:s6M9tyPDty+4zTdQGJYKpGJM9Nz7N6ITMdjPvNSLX5g=
|
github.com/ClusterCockpit/cc-lib/v2 v2.5.1 h1:s6M9tyPDty+4zTdQGJYKpGJM9Nz7N6ITMdjPvNSLX5g=
|
||||||
github.com/ClusterCockpit/cc-lib/v2 v2.5.1/go.mod h1:DZ8OIHPUZJpWqErLITt0B8P6/Q7CBW2IQSQ5YiFFaG0=
|
github.com/ClusterCockpit/cc-lib/v2 v2.5.1/go.mod h1:DZ8OIHPUZJpWqErLITt0B8P6/Q7CBW2IQSQ5YiFFaG0=
|
||||||
|
github.com/ClusterCockpit/cc-lib/v2 v2.6.0 h1:Q7zvRAVhfYA9PDB18pfY9A/6Ws4oWpnv8+P9MBRUDzg=
|
||||||
|
github.com/ClusterCockpit/cc-lib/v2 v2.6.0/go.mod h1:DZ8OIHPUZJpWqErLITt0B8P6/Q7CBW2IQSQ5YiFFaG0=
|
||||||
github.com/DATA-DOG/go-sqlmock v1.5.2 h1:OcvFkGmslmlZibjAjaHm3L//6LiuBgolP7OputlJIzU=
|
github.com/DATA-DOG/go-sqlmock v1.5.2 h1:OcvFkGmslmlZibjAjaHm3L//6LiuBgolP7OputlJIzU=
|
||||||
github.com/DATA-DOG/go-sqlmock v1.5.2/go.mod h1:88MAG/4G7SMwSE3CeA0ZKzrT5CiOU3OJ+JlNzwDqpNU=
|
github.com/DATA-DOG/go-sqlmock v1.5.2/go.mod h1:88MAG/4G7SMwSE3CeA0ZKzrT5CiOU3OJ+JlNzwDqpNU=
|
||||||
github.com/KyleBanks/depth v1.2.1 h1:5h8fQADFrWtarTdtDudMmGsC7GPbOAu6RVB3ffsVFHc=
|
github.com/KyleBanks/depth v1.2.1 h1:5h8fQADFrWtarTdtDudMmGsC7GPbOAu6RVB3ffsVFHc=
|
||||||
|
|||||||
@@ -70,14 +70,15 @@ func (ccms *CCMetricStore) buildQueries(
|
|||||||
scopes []schema.MetricScope,
|
scopes []schema.MetricScope,
|
||||||
resolution int,
|
resolution int,
|
||||||
) ([]APIQuery, []schema.MetricScope, error) {
|
) ([]APIQuery, []schema.MetricScope, error) {
|
||||||
|
// Initialize both slices together
|
||||||
queries := make([]APIQuery, 0, len(metrics)*len(scopes)*len(job.Resources))
|
queries := make([]APIQuery, 0, len(metrics)*len(scopes)*len(job.Resources))
|
||||||
assignedScope := []schema.MetricScope{}
|
assignedScope := make([]schema.MetricScope, 0, len(metrics)*len(scopes)*len(job.Resources))
|
||||||
|
|
||||||
subcluster, scerr := archive.GetSubCluster(job.Cluster, job.SubCluster)
|
topology, err := ccms.getTopology(job.Cluster, job.SubCluster)
|
||||||
if scerr != nil {
|
if err != nil {
|
||||||
return nil, nil, scerr
|
cclog.Errorf("could not load cluster %s subCluster %s topology: %s", job.Cluster, job.SubCluster, err.Error())
|
||||||
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
topology := subcluster.Topology
|
|
||||||
|
|
||||||
for _, metric := range metrics {
|
for _, metric := range metrics {
|
||||||
remoteName := metric
|
remoteName := metric
|
||||||
@@ -128,7 +129,7 @@ func (ccms *CCMetricStore) buildQueries(
|
|||||||
hostQueries, hostScopes := buildScopeQueries(
|
hostQueries, hostScopes := buildScopeQueries(
|
||||||
nativeScope, requestedScope,
|
nativeScope, requestedScope,
|
||||||
remoteName, host.Hostname,
|
remoteName, host.Hostname,
|
||||||
&topology, hwthreads, host.Accelerators,
|
topology, hwthreads, host.Accelerators,
|
||||||
resolution,
|
resolution,
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -163,19 +164,9 @@ func (ccms *CCMetricStore) buildNodeQueries(
|
|||||||
scopes []schema.MetricScope,
|
scopes []schema.MetricScope,
|
||||||
resolution int,
|
resolution int,
|
||||||
) ([]APIQuery, []schema.MetricScope, error) {
|
) ([]APIQuery, []schema.MetricScope, error) {
|
||||||
|
// Initialize both slices together
|
||||||
queries := make([]APIQuery, 0, len(metrics)*len(scopes)*len(nodes))
|
queries := make([]APIQuery, 0, len(metrics)*len(scopes)*len(nodes))
|
||||||
assignedScope := []schema.MetricScope{}
|
assignedScope := make([]schema.MetricScope, 0, len(metrics)*len(scopes)*len(nodes))
|
||||||
|
|
||||||
// Get Topol before loop if subCluster given
|
|
||||||
var subClusterTopol *schema.SubCluster
|
|
||||||
var scterr error
|
|
||||||
if subCluster != "" {
|
|
||||||
subClusterTopol, scterr = archive.GetSubCluster(cluster, subCluster)
|
|
||||||
if scterr != nil {
|
|
||||||
cclog.Errorf("could not load cluster %s subCluster %s topology: %s", cluster, subCluster, scterr.Error())
|
|
||||||
return nil, nil, scterr
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, metric := range metrics {
|
for _, metric := range metrics {
|
||||||
remoteName := metric
|
remoteName := metric
|
||||||
@@ -215,22 +206,22 @@ func (ccms *CCMetricStore) buildNodeQueries(
|
|||||||
handledScopes = append(handledScopes, scope)
|
handledScopes = append(handledScopes, scope)
|
||||||
|
|
||||||
for _, hostname := range nodes {
|
for _, hostname := range nodes {
|
||||||
|
var topology *schema.Topology
|
||||||
|
var err error
|
||||||
|
|
||||||
// If no subCluster given, get it by node
|
// If no subCluster given, get it by node
|
||||||
if subCluster == "" {
|
if subCluster == "" {
|
||||||
subClusterName, scnerr := archive.GetSubClusterByNode(cluster, hostname)
|
topology, err = ccms.getTopologyByNode(cluster, hostname)
|
||||||
if scnerr != nil {
|
} else {
|
||||||
return nil, nil, scnerr
|
topology, err = ccms.getTopology(cluster, subCluster)
|
||||||
}
|
|
||||||
subClusterTopol, scterr = archive.GetSubCluster(cluster, subClusterName)
|
|
||||||
if scterr != nil {
|
|
||||||
return nil, nil, scterr
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Always full node hwthread id list, no partial queries expected -> Use "topology.Node" directly where applicable
|
// Always full node hwthread id list, no partial queries expected -> Use "topology.Node" directly where applicable
|
||||||
// Always full accelerator id list, no partial queries expected -> Use "acceleratorIds" directly where applicable
|
// Always full accelerator id list, no partial queries expected -> Use "acceleratorIds" directly where applicable
|
||||||
topology := subClusterTopol.Topology
|
|
||||||
acceleratorIds := topology.GetAcceleratorIDs()
|
acceleratorIds := topology.GetAcceleratorIDs()
|
||||||
|
|
||||||
// Moved check here if metric matches hardware specs
|
// Moved check here if metric matches hardware specs
|
||||||
@@ -241,7 +232,7 @@ func (ccms *CCMetricStore) buildNodeQueries(
|
|||||||
nodeQueries, nodeScopes := buildScopeQueries(
|
nodeQueries, nodeScopes := buildScopeQueries(
|
||||||
nativeScope, requestedScope,
|
nativeScope, requestedScope,
|
||||||
remoteName, hostname,
|
remoteName, hostname,
|
||||||
&topology, topology.Node, acceleratorIds,
|
topology, topology.Node, acceleratorIds,
|
||||||
resolution,
|
resolution,
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -278,7 +269,6 @@ func buildScopeQueries(
|
|||||||
// Accelerator -> Accelerator (Use "accelerator" scope if requested scope is lower than node)
|
// Accelerator -> Accelerator (Use "accelerator" scope if requested scope is lower than node)
|
||||||
if nativeScope == schema.MetricScopeAccelerator && scope.LT(schema.MetricScopeNode) {
|
if nativeScope == schema.MetricScopeAccelerator && scope.LT(schema.MetricScopeNode) {
|
||||||
if scope != schema.MetricScopeAccelerator {
|
if scope != schema.MetricScopeAccelerator {
|
||||||
// Skip all other caught cases
|
|
||||||
return queries, scopes
|
return queries, scopes
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -451,6 +441,31 @@ func buildScopeQueries(
|
|||||||
return queries, scopes
|
return queries, scopes
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// MemoryDomain -> Socket
|
||||||
|
if nativeScope == schema.MetricScopeMemoryDomain && scope == schema.MetricScopeSocket {
|
||||||
|
memDomains, _ := topology.GetMemoryDomainsFromHWThreads(hwthreads)
|
||||||
|
socketToDomains, err := topology.GetMemoryDomainsBySocket(memDomains)
|
||||||
|
if err != nil {
|
||||||
|
cclog.Errorf("Error mapping memory domains to sockets, return unchanged: %v", err)
|
||||||
|
return queries, scopes
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create a query for each socket
|
||||||
|
for _, domains := range socketToDomains {
|
||||||
|
queries = append(queries, APIQuery{
|
||||||
|
Metric: metric,
|
||||||
|
Hostname: hostname,
|
||||||
|
Aggregate: true,
|
||||||
|
Type: &memoryDomainString,
|
||||||
|
TypeIds: intToStringSlice(domains),
|
||||||
|
Resolution: resolution,
|
||||||
|
})
|
||||||
|
// Add scope for each query, not just once
|
||||||
|
scopes = append(scopes, scope)
|
||||||
|
}
|
||||||
|
return queries, scopes
|
||||||
|
}
|
||||||
|
|
||||||
// Socket -> Socket
|
// Socket -> Socket
|
||||||
if nativeScope == schema.MetricScopeSocket && scope == schema.MetricScopeSocket {
|
if nativeScope == schema.MetricScopeSocket && scope == schema.MetricScopeSocket {
|
||||||
sockets, _ := topology.GetSocketsFromHWThreads(hwthreads)
|
sockets, _ := topology.GetSocketsFromHWThreads(hwthreads)
|
||||||
|
|||||||
@@ -75,6 +75,7 @@ type CCMetricStore struct {
|
|||||||
jwt string // JWT Bearer token for authentication
|
jwt string // JWT Bearer token for authentication
|
||||||
url string // Base URL of cc-metric-store instance
|
url string // Base URL of cc-metric-store instance
|
||||||
queryEndpoint string // Full URL to query API endpoint
|
queryEndpoint string // Full URL to query API endpoint
|
||||||
|
topologyCache map[string]*schema.Topology // cluster -> topology cache
|
||||||
}
|
}
|
||||||
|
|
||||||
// APIQueryRequest represents a request to the cc-metric-store query API.
|
// APIQueryRequest represents a request to the cc-metric-store query API.
|
||||||
@@ -133,6 +134,7 @@ func NewCCMetricStore(url string, token string) *CCMetricStore {
|
|||||||
client: http.Client{
|
client: http.Client{
|
||||||
Timeout: 10 * time.Second,
|
Timeout: 10 * time.Second,
|
||||||
},
|
},
|
||||||
|
topologyCache: make(map[string]*schema.Topology),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -185,6 +187,32 @@ func (ccms *CCMetricStore) doRequest(
|
|||||||
return &resBody, nil
|
return &resBody, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// getTopology returns the topology for a given cluster and subcluster, caching it if not already present
|
||||||
|
func (ccms *CCMetricStore) getTopology(cluster, subCluster string) (*schema.Topology, error) {
|
||||||
|
cacheKey := fmt.Sprintf("%s:%s", cluster, subCluster)
|
||||||
|
if topology, ok := ccms.topologyCache[cacheKey]; ok {
|
||||||
|
return topology, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
subcluster, err := archive.GetSubCluster(cluster, subCluster)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
ccms.topologyCache[cacheKey] = &subcluster.Topology
|
||||||
|
return &subcluster.Topology, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// getTopologyByNode returns the topology for a given cluster and node, caching it if not already present
|
||||||
|
func (ccms *CCMetricStore) getTopologyByNode(cluster, node string) (*schema.Topology, error) {
|
||||||
|
subCluster, err := archive.GetSubClusterByNode(cluster, node)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return ccms.getTopology(cluster, subCluster)
|
||||||
|
}
|
||||||
|
|
||||||
// LoadData retrieves time series data and statistics for the specified job and metrics.
|
// LoadData retrieves time series data and statistics for the specified job and metrics.
|
||||||
// It queries data for the job's time range and resources, handling scope transformations automatically.
|
// It queries data for the job's time range and resources, handling scope transformations automatically.
|
||||||
//
|
//
|
||||||
@@ -210,6 +238,12 @@ func (ccms *CCMetricStore) LoadData(
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Verify assignment is correct - log any inconsistencies for debugging
|
||||||
|
if len(queries) != len(assignedScope) {
|
||||||
|
cclog.Errorf("Critical error: queries and assignedScope have different lengths after buildQueries: %d vs %d",
|
||||||
|
len(queries), len(assignedScope))
|
||||||
|
}
|
||||||
|
|
||||||
req := APIQueryRequest{
|
req := APIQueryRequest{
|
||||||
Cluster: job.Cluster,
|
Cluster: job.Cluster,
|
||||||
From: job.StartTime,
|
From: job.StartTime,
|
||||||
@@ -227,11 +261,37 @@ func (ccms *CCMetricStore) LoadData(
|
|||||||
|
|
||||||
var errors []string
|
var errors []string
|
||||||
jobData := make(schema.JobData)
|
jobData := make(schema.JobData)
|
||||||
|
|
||||||
|
// Add safety check for potential index out of range errors
|
||||||
|
if len(resBody.Results) != len(req.Queries) || len(assignedScope) != len(req.Queries) {
|
||||||
|
cclog.Warnf("Mismatch in query results count: queries=%d, results=%d, assignedScope=%d",
|
||||||
|
len(req.Queries), len(resBody.Results), len(assignedScope))
|
||||||
|
if len(resBody.Results) > len(req.Queries) {
|
||||||
|
resBody.Results = resBody.Results[:len(req.Queries)]
|
||||||
|
}
|
||||||
|
if len(assignedScope) > len(req.Queries) {
|
||||||
|
assignedScope = assignedScope[:len(req.Queries)]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
for i, row := range resBody.Results {
|
for i, row := range resBody.Results {
|
||||||
|
// Safety check to prevent index out of range errors
|
||||||
|
if i >= len(req.Queries) || i >= len(assignedScope) {
|
||||||
|
cclog.Warnf("Index out of range prevented: i=%d, queries=%d, assignedScope=%d",
|
||||||
|
i, len(req.Queries), len(assignedScope))
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
query := req.Queries[i]
|
query := req.Queries[i]
|
||||||
metric := query.Metric
|
metric := query.Metric
|
||||||
scope := assignedScope[i]
|
scope := assignedScope[i]
|
||||||
mc := archive.GetMetricConfig(job.Cluster, metric)
|
mc := archive.GetMetricConfig(job.Cluster, metric)
|
||||||
|
|
||||||
|
if mc == nil {
|
||||||
|
cclog.Warnf("Metric config not found for %s on cluster %s", metric, job.Cluster)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
if _, ok := jobData[metric]; !ok {
|
if _, ok := jobData[metric]; !ok {
|
||||||
jobData[metric] = make(map[schema.MetricScope]*schema.JobMetric)
|
jobData[metric] = make(map[schema.MetricScope]*schema.JobMetric)
|
||||||
}
|
}
|
||||||
@@ -260,8 +320,15 @@ func (ccms *CCMetricStore) LoadData(
|
|||||||
|
|
||||||
id := (*string)(nil)
|
id := (*string)(nil)
|
||||||
if query.Type != nil {
|
if query.Type != nil {
|
||||||
|
// Check if ndx is within the bounds of TypeIds slice
|
||||||
|
if ndx < len(query.TypeIds) {
|
||||||
id = new(string)
|
id = new(string)
|
||||||
*id = query.TypeIds[ndx]
|
*id = query.TypeIds[ndx]
|
||||||
|
} else {
|
||||||
|
// Log the error but continue processing
|
||||||
|
cclog.Warnf("TypeIds index out of range: %d with length %d for metric %s on host %s",
|
||||||
|
ndx, len(query.TypeIds), query.Metric, query.Hostname)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
sanitizeStats(&res.Avg, &res.Min, &res.Max)
|
sanitizeStats(&res.Avg, &res.Min, &res.Max)
|
||||||
@@ -412,8 +479,15 @@ func (ccms *CCMetricStore) LoadScopedStats(
|
|||||||
|
|
||||||
id := (*string)(nil)
|
id := (*string)(nil)
|
||||||
if query.Type != nil {
|
if query.Type != nil {
|
||||||
|
// Check if ndx is within the bounds of TypeIds slice
|
||||||
|
if ndx < len(query.TypeIds) {
|
||||||
id = new(string)
|
id = new(string)
|
||||||
*id = query.TypeIds[ndx]
|
*id = query.TypeIds[ndx]
|
||||||
|
} else {
|
||||||
|
// Log the error but continue processing
|
||||||
|
cclog.Warnf("TypeIds index out of range: %d with length %d for metric %s on host %s",
|
||||||
|
ndx, len(query.TypeIds), query.Metric, query.Hostname)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
sanitizeStats(&res.Avg, &res.Min, &res.Max)
|
sanitizeStats(&res.Avg, &res.Min, &res.Max)
|
||||||
@@ -561,6 +635,12 @@ func (ccms *CCMetricStore) LoadNodeListData(
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Verify assignment is correct - log any inconsistencies for debugging
|
||||||
|
if len(queries) != len(assignedScope) {
|
||||||
|
cclog.Errorf("Critical error: queries and assignedScope have different lengths after buildNodeQueries: %d vs %d",
|
||||||
|
len(queries), len(assignedScope))
|
||||||
|
}
|
||||||
|
|
||||||
req := APIQueryRequest{
|
req := APIQueryRequest{
|
||||||
Cluster: cluster,
|
Cluster: cluster,
|
||||||
Queries: queries,
|
Queries: queries,
|
||||||
@@ -578,17 +658,47 @@ func (ccms *CCMetricStore) LoadNodeListData(
|
|||||||
|
|
||||||
var errors []string
|
var errors []string
|
||||||
data := make(map[string]schema.JobData)
|
data := make(map[string]schema.JobData)
|
||||||
|
|
||||||
|
// Add safety check for index out of range issues
|
||||||
|
if len(resBody.Results) != len(req.Queries) || len(assignedScope) != len(req.Queries) {
|
||||||
|
cclog.Warnf("Mismatch in query results count: queries=%d, results=%d, assignedScope=%d",
|
||||||
|
len(req.Queries), len(resBody.Results), len(assignedScope))
|
||||||
|
if len(resBody.Results) > len(req.Queries) {
|
||||||
|
resBody.Results = resBody.Results[:len(req.Queries)]
|
||||||
|
}
|
||||||
|
if len(assignedScope) > len(req.Queries) {
|
||||||
|
assignedScope = assignedScope[:len(req.Queries)]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
for i, row := range resBody.Results {
|
for i, row := range resBody.Results {
|
||||||
|
// Safety check to prevent index out of range errors
|
||||||
|
if i >= len(req.Queries) || i >= len(assignedScope) {
|
||||||
|
cclog.Warnf("Index out of range prevented: i=%d, queries=%d, assignedScope=%d",
|
||||||
|
i, len(req.Queries), len(assignedScope))
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
var query APIQuery
|
var query APIQuery
|
||||||
if resBody.Queries != nil {
|
if resBody.Queries != nil {
|
||||||
|
if i < len(resBody.Queries) {
|
||||||
query = resBody.Queries[i]
|
query = resBody.Queries[i]
|
||||||
|
} else {
|
||||||
|
cclog.Warnf("Index out of range prevented for resBody.Queries: i=%d, len=%d",
|
||||||
|
i, len(resBody.Queries))
|
||||||
|
continue
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
query = req.Queries[i]
|
query = req.Queries[i]
|
||||||
}
|
}
|
||||||
// qdata := res[0]
|
|
||||||
metric := query.Metric
|
metric := query.Metric
|
||||||
scope := assignedScope[i]
|
scope := assignedScope[i]
|
||||||
mc := archive.GetMetricConfig(cluster, metric)
|
mc := archive.GetMetricConfig(cluster, metric)
|
||||||
|
if mc == nil {
|
||||||
|
cclog.Warnf("Metric config not found for %s on cluster %s", metric, cluster)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
res := mc.Timestep
|
res := mc.Timestep
|
||||||
if len(row) > 0 {
|
if len(row) > 0 {
|
||||||
@@ -627,8 +737,15 @@ func (ccms *CCMetricStore) LoadNodeListData(
|
|||||||
|
|
||||||
id := (*string)(nil)
|
id := (*string)(nil)
|
||||||
if query.Type != nil {
|
if query.Type != nil {
|
||||||
|
// Check if ndx is within the bounds of TypeIds slice
|
||||||
|
if ndx < len(query.TypeIds) {
|
||||||
id = new(string)
|
id = new(string)
|
||||||
*id = query.TypeIds[ndx]
|
*id = query.TypeIds[ndx]
|
||||||
|
} else {
|
||||||
|
// Log the error but continue processing
|
||||||
|
cclog.Warnf("TypeIds index out of range: %d with length %d for metric %s on host %s",
|
||||||
|
ndx, len(query.TypeIds), query.Metric, query.Hostname)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
sanitizeStats(&res.Avg, &res.Min, &res.Max)
|
sanitizeStats(&res.Avg, &res.Min, &res.Max)
|
||||||
|
|||||||
@@ -190,6 +190,8 @@ func (t *JobClassTagger) EventCallback() {
|
|||||||
cclog.Fatal(err)
|
cclog.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
t.rules = make(map[string]ruleInfo)
|
||||||
|
|
||||||
parametersFile := filepath.Join(t.cfgPath, parametersFileName)
|
parametersFile := filepath.Join(t.cfgPath, parametersFileName)
|
||||||
if util.CheckFileExists(parametersFile) {
|
if util.CheckFileExists(parametersFile) {
|
||||||
cclog.Info("Merge parameters")
|
cclog.Info("Merge parameters")
|
||||||
@@ -301,8 +303,9 @@ func (t *JobClassTagger) Register() error {
|
|||||||
// - Shared parameters defined in parameters.json
|
// - Shared parameters defined in parameters.json
|
||||||
// - Computed variables from the rule definition
|
// - Computed variables from the rule definition
|
||||||
//
|
//
|
||||||
// Rules are evaluated in arbitrary order. If multiple rules match, only the first
|
// Rules are evaluated in arbitrary order. Multiple rules can match and apply
|
||||||
// encountered match is applied (FIXME: this should handle multiple matches).
|
// their tags to the same job. Hint messages from all matching rules are collected
|
||||||
|
// and stored as a combined message in the job metadata.
|
||||||
func (t *JobClassTagger) Match(job *schema.Job) {
|
func (t *JobClassTagger) Match(job *schema.Job) {
|
||||||
jobStats, err := t.getStatistics(job)
|
jobStats, err := t.getStatistics(job)
|
||||||
metricsList := t.getMetricConfig(job.Cluster, job.SubCluster)
|
metricsList := t.getMetricConfig(job.Cluster, job.SubCluster)
|
||||||
@@ -312,6 +315,9 @@ func (t *JobClassTagger) Match(job *schema.Job) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
id := *job.ID
|
||||||
|
var messages []string
|
||||||
|
|
||||||
for tag, ri := range t.rules {
|
for tag, ri := range t.rules {
|
||||||
env := make(map[string]any)
|
env := make(map[string]any)
|
||||||
maps.Copy(env, ri.env)
|
maps.Copy(env, ri.env)
|
||||||
@@ -329,11 +335,13 @@ func (t *JobClassTagger) Match(job *schema.Job) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// add metrics to env
|
// add metrics to env
|
||||||
|
skipRule := false
|
||||||
for _, m := range ri.metrics {
|
for _, m := range ri.metrics {
|
||||||
stats, ok := jobStats[m]
|
stats, ok := jobStats[m]
|
||||||
if !ok {
|
if !ok {
|
||||||
cclog.Errorf("job classification failed for job %d: missing metric '%s'", job.JobID, m)
|
cclog.Errorf("job classification: missing metric '%s' for rule %s on job %d", m, tag, job.JobID)
|
||||||
return
|
skipRule = true
|
||||||
|
break
|
||||||
}
|
}
|
||||||
env[m] = map[string]any{
|
env[m] = map[string]any{
|
||||||
"min": stats.Min,
|
"min": stats.Min,
|
||||||
@@ -347,44 +355,55 @@ func (t *JobClassTagger) Match(job *schema.Job) {
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if skipRule {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
// check rule requirements apply
|
// check rule requirements apply
|
||||||
|
requirementsMet := true
|
||||||
for _, r := range ri.requirements {
|
for _, r := range ri.requirements {
|
||||||
ok, err := expr.Run(r, env)
|
ok, err := expr.Run(r, env)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
cclog.Errorf("error running requirement for rule %s: %#v", tag, err)
|
cclog.Errorf("error running requirement for rule %s: %#v", tag, err)
|
||||||
return
|
requirementsMet = false
|
||||||
|
break
|
||||||
}
|
}
|
||||||
if !ok.(bool) {
|
if !ok.(bool) {
|
||||||
cclog.Infof("requirement for rule %s not met", tag)
|
cclog.Infof("requirement for rule %s not met", tag)
|
||||||
return
|
requirementsMet = false
|
||||||
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if !requirementsMet {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
// validate rule expression
|
// evaluate rule variables
|
||||||
|
varError := false
|
||||||
for _, v := range ri.variables {
|
for _, v := range ri.variables {
|
||||||
value, err := expr.Run(v.expr, env)
|
value, err := expr.Run(v.expr, env)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
cclog.Errorf("error running rule %s: %#v", tag, err)
|
cclog.Errorf("error evaluating variable %s for rule %s: %#v", v.name, tag, err)
|
||||||
return
|
varError = true
|
||||||
|
break
|
||||||
}
|
}
|
||||||
env[v.name] = value
|
env[v.name] = value
|
||||||
}
|
}
|
||||||
|
if varError {
|
||||||
// dump.P(env)
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
match, err := expr.Run(ri.rule, env)
|
match, err := expr.Run(ri.rule, env)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
cclog.Errorf("error running rule %s: %#v", tag, err)
|
cclog.Errorf("error running rule %s: %#v", tag, err)
|
||||||
return
|
continue
|
||||||
}
|
}
|
||||||
if match.(bool) {
|
if match.(bool) {
|
||||||
cclog.Info("Rule matches!")
|
cclog.Info("Rule matches!")
|
||||||
id := *job.ID
|
|
||||||
if !t.repo.HasTag(id, t.tagType, tag) {
|
if !t.repo.HasTag(id, t.tagType, tag) {
|
||||||
_, err := t.repo.AddTagOrCreateDirect(id, t.tagType, tag)
|
if _, err := t.repo.AddTagOrCreateDirect(id, t.tagType, tag); err != nil {
|
||||||
if err != nil {
|
cclog.Errorf("failed to add tag '%s' to job %d: %v", tag, id, err)
|
||||||
return
|
continue
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -392,17 +411,18 @@ func (t *JobClassTagger) Match(job *schema.Job) {
|
|||||||
var msg bytes.Buffer
|
var msg bytes.Buffer
|
||||||
if err := ri.hint.Execute(&msg, env); err != nil {
|
if err := ri.hint.Execute(&msg, env); err != nil {
|
||||||
cclog.Errorf("Template error: %s", err.Error())
|
cclog.Errorf("Template error: %s", err.Error())
|
||||||
return
|
continue
|
||||||
}
|
|
||||||
|
|
||||||
// FIXME: Handle case where multiple tags apply
|
|
||||||
// FIXME: Handle case where multiple tags apply
|
|
||||||
err = t.repo.UpdateMetadata(job, "message", msg.String())
|
|
||||||
if err != nil {
|
|
||||||
return
|
|
||||||
}
|
}
|
||||||
|
messages = append(messages, msg.String())
|
||||||
} else {
|
} else {
|
||||||
cclog.Info("Rule does not match!")
|
cclog.Info("Rule does not match!")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if len(messages) > 0 {
|
||||||
|
combined := strings.Join(messages, "\n")
|
||||||
|
if err := t.repo.UpdateMetadata(job, "message", combined); err != nil {
|
||||||
|
cclog.Errorf("failed to update metadata for job %d: %v", *job.ID, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -98,6 +98,8 @@ func (t *AppTagger) EventCallback() {
|
|||||||
cclog.Fatal(err)
|
cclog.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
t.apps = make([]appInfo, 0)
|
||||||
|
|
||||||
for _, fn := range files {
|
for _, fn := range files {
|
||||||
if fn.IsDir() {
|
if fn.IsDir() {
|
||||||
continue
|
continue
|
||||||
@@ -163,7 +165,7 @@ func (t *AppTagger) Register() error {
|
|||||||
// It fetches the job metadata, extracts the job script, and matches it against
|
// It fetches the job metadata, extracts the job script, and matches it against
|
||||||
// all configured application patterns using regular expressions.
|
// all configured application patterns using regular expressions.
|
||||||
// If a match is found, the corresponding application tag is added to the job.
|
// If a match is found, the corresponding application tag is added to the job.
|
||||||
// Only the first matching application is tagged.
|
// Multiple application tags can be applied if patterns for different apps match.
|
||||||
func (t *AppTagger) Match(job *schema.Job) {
|
func (t *AppTagger) Match(job *schema.Job) {
|
||||||
r := repository.GetJobRepository()
|
r := repository.GetJobRepository()
|
||||||
|
|
||||||
@@ -199,6 +201,7 @@ func (t *AppTagger) Match(job *schema.Job) {
|
|||||||
jobscriptLower := strings.ToLower(jobscript)
|
jobscriptLower := strings.ToLower(jobscript)
|
||||||
cclog.Debugf("AppTagger: matching job %d (script length: %d) against %d apps", id, len(jobscriptLower), len(t.apps))
|
cclog.Debugf("AppTagger: matching job %d (script length: %d) against %d apps", id, len(jobscriptLower), len(t.apps))
|
||||||
|
|
||||||
|
matched := false
|
||||||
for _, a := range t.apps {
|
for _, a := range t.apps {
|
||||||
for _, re := range a.patterns {
|
for _, re := range a.patterns {
|
||||||
if re.MatchString(jobscriptLower) {
|
if re.MatchString(jobscriptLower) {
|
||||||
@@ -210,10 +213,13 @@ func (t *AppTagger) Match(job *schema.Job) {
|
|||||||
cclog.Errorf("AppTagger: failed to add tag '%s' to job %d: %v", a.tag, id, err)
|
cclog.Errorf("AppTagger: failed to add tag '%s' to job %d: %v", a.tag, id, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return
|
matched = true
|
||||||
|
break // matched this app, move to next app
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if !matched {
|
||||||
cclog.Debugf("AppTagger: no pattern matched for job %d on %s", id, job.Cluster)
|
cclog.Debugf("AppTagger: no pattern matched for job %d on %s", id, job.Cluster)
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|||||||
@@ -93,6 +93,12 @@ func (ccms *InternalMetricStore) LoadData(
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Verify assignment is correct - log any inconsistencies for debugging
|
||||||
|
if len(queries) != len(assignedScope) {
|
||||||
|
cclog.Errorf("Critical error: queries and assignedScope have different lengths after buildQueries: %d vs %d",
|
||||||
|
len(queries), len(assignedScope))
|
||||||
|
}
|
||||||
|
|
||||||
req := APIQueryRequest{
|
req := APIQueryRequest{
|
||||||
Cluster: job.Cluster,
|
Cluster: job.Cluster,
|
||||||
From: job.StartTime,
|
From: job.StartTime,
|
||||||
@@ -110,9 +116,24 @@ func (ccms *InternalMetricStore) LoadData(
|
|||||||
|
|
||||||
var errors []string
|
var errors []string
|
||||||
jobData := make(schema.JobData)
|
jobData := make(schema.JobData)
|
||||||
|
|
||||||
|
// Add safety check for potential index out of range errors
|
||||||
|
if len(resBody.Results) != len(req.Queries) || len(assignedScope) != len(req.Queries) {
|
||||||
|
cclog.Warnf("Mismatch in query results count: queries=%d, results=%d, assignedScope=%d",
|
||||||
|
len(req.Queries), len(resBody.Results), len(assignedScope))
|
||||||
|
if len(resBody.Results) > len(req.Queries) {
|
||||||
|
resBody.Results = resBody.Results[:len(req.Queries)]
|
||||||
|
}
|
||||||
|
if len(assignedScope) > len(req.Queries) {
|
||||||
|
assignedScope = assignedScope[:len(req.Queries)]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
for i, row := range resBody.Results {
|
for i, row := range resBody.Results {
|
||||||
if len(row) == 0 {
|
// Safety check to prevent index out of range errors
|
||||||
// No Data Found For Metric, Logged in FetchData to Warn
|
if i >= len(req.Queries) || i >= len(assignedScope) {
|
||||||
|
cclog.Warnf("Index out of range prevented: i=%d, queries=%d, assignedScope=%d",
|
||||||
|
i, len(req.Queries), len(assignedScope))
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -120,6 +141,12 @@ func (ccms *InternalMetricStore) LoadData(
|
|||||||
metric := query.Metric
|
metric := query.Metric
|
||||||
scope := assignedScope[i]
|
scope := assignedScope[i]
|
||||||
mc := archive.GetMetricConfig(job.Cluster, metric)
|
mc := archive.GetMetricConfig(job.Cluster, metric)
|
||||||
|
|
||||||
|
if mc == nil {
|
||||||
|
cclog.Warnf("Metric config not found for %s on cluster %s", metric, job.Cluster)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
if _, ok := jobData[metric]; !ok {
|
if _, ok := jobData[metric]; !ok {
|
||||||
jobData[metric] = make(map[schema.MetricScope]*schema.JobMetric)
|
jobData[metric] = make(map[schema.MetricScope]*schema.JobMetric)
|
||||||
}
|
}
|
||||||
@@ -148,8 +175,15 @@ func (ccms *InternalMetricStore) LoadData(
|
|||||||
|
|
||||||
id := (*string)(nil)
|
id := (*string)(nil)
|
||||||
if query.Type != nil {
|
if query.Type != nil {
|
||||||
|
// Check if ndx is within the bounds of TypeIds slice
|
||||||
|
if ndx < len(query.TypeIds) {
|
||||||
id = new(string)
|
id = new(string)
|
||||||
*id = query.TypeIds[ndx]
|
*id = query.TypeIds[ndx]
|
||||||
|
} else {
|
||||||
|
// Log the error but continue processing
|
||||||
|
cclog.Warnf("TypeIds index out of range: %d with length %d for metric %s on host %s",
|
||||||
|
ndx, len(query.TypeIds), query.Metric, query.Hostname)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
sanitizeStats(&res)
|
sanitizeStats(&res)
|
||||||
@@ -650,8 +684,15 @@ func (ccms *InternalMetricStore) LoadScopedStats(
|
|||||||
|
|
||||||
id := (*string)(nil)
|
id := (*string)(nil)
|
||||||
if query.Type != nil {
|
if query.Type != nil {
|
||||||
|
// Check if ndx is within the bounds of TypeIds slice
|
||||||
|
if ndx < len(query.TypeIds) {
|
||||||
id = new(string)
|
id = new(string)
|
||||||
*id = query.TypeIds[ndx]
|
*id = query.TypeIds[ndx]
|
||||||
|
} else {
|
||||||
|
// Log the error but continue processing
|
||||||
|
cclog.Warnf("TypeIds index out of range: %d with length %d for metric %s on host %s",
|
||||||
|
ndx, len(query.TypeIds), query.Metric, query.Hostname)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
sanitizeStats(&res)
|
sanitizeStats(&res)
|
||||||
@@ -823,6 +864,12 @@ func (ccms *InternalMetricStore) LoadNodeListData(
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Verify assignment is correct - log any inconsistencies for debugging
|
||||||
|
if len(queries) != len(assignedScope) {
|
||||||
|
cclog.Errorf("Critical error: queries and assignedScope have different lengths after buildNodeQueries: %d vs %d",
|
||||||
|
len(queries), len(assignedScope))
|
||||||
|
}
|
||||||
|
|
||||||
req := APIQueryRequest{
|
req := APIQueryRequest{
|
||||||
Cluster: cluster,
|
Cluster: cluster,
|
||||||
Queries: queries,
|
Queries: queries,
|
||||||
@@ -840,14 +887,36 @@ func (ccms *InternalMetricStore) LoadNodeListData(
|
|||||||
|
|
||||||
var errors []string
|
var errors []string
|
||||||
data := make(map[string]schema.JobData)
|
data := make(map[string]schema.JobData)
|
||||||
|
|
||||||
|
// Add safety check for index out of range issues
|
||||||
|
if len(resBody.Results) != len(req.Queries) || len(assignedScope) != len(req.Queries) {
|
||||||
|
cclog.Warnf("Mismatch in query results count: queries=%d, results=%d, assignedScope=%d",
|
||||||
|
len(req.Queries), len(resBody.Results), len(assignedScope))
|
||||||
|
if len(resBody.Results) > len(req.Queries) {
|
||||||
|
resBody.Results = resBody.Results[:len(req.Queries)]
|
||||||
|
}
|
||||||
|
if len(assignedScope) > len(req.Queries) {
|
||||||
|
assignedScope = assignedScope[:len(req.Queries)]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
for i, row := range resBody.Results {
|
for i, row := range resBody.Results {
|
||||||
if len(row) == 0 {
|
// Safety check to prevent index out of range errors
|
||||||
// No Data Found For Metric, Logged in FetchData to Warn
|
if i >= len(req.Queries) || i >= len(assignedScope) {
|
||||||
|
cclog.Warnf("Index out of range prevented: i=%d, queries=%d, assignedScope=%d",
|
||||||
|
i, len(req.Queries), len(assignedScope))
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
var query APIQuery
|
var query APIQuery
|
||||||
if resBody.Queries != nil {
|
if resBody.Queries != nil {
|
||||||
|
if i < len(resBody.Queries) {
|
||||||
query = resBody.Queries[i]
|
query = resBody.Queries[i]
|
||||||
|
} else {
|
||||||
|
cclog.Warnf("Index out of range prevented for resBody.Queries: i=%d, len=%d",
|
||||||
|
i, len(resBody.Queries))
|
||||||
|
continue
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
query = req.Queries[i]
|
query = req.Queries[i]
|
||||||
}
|
}
|
||||||
@@ -855,6 +924,10 @@ func (ccms *InternalMetricStore) LoadNodeListData(
|
|||||||
metric := query.Metric
|
metric := query.Metric
|
||||||
scope := assignedScope[i]
|
scope := assignedScope[i]
|
||||||
mc := archive.GetMetricConfig(cluster, metric)
|
mc := archive.GetMetricConfig(cluster, metric)
|
||||||
|
if mc == nil {
|
||||||
|
cclog.Warnf("Metric config not found for %s on cluster %s", metric, cluster)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
res := mc.Timestep
|
res := mc.Timestep
|
||||||
if len(row) > 0 {
|
if len(row) > 0 {
|
||||||
@@ -893,8 +966,15 @@ func (ccms *InternalMetricStore) LoadNodeListData(
|
|||||||
|
|
||||||
id := (*string)(nil)
|
id := (*string)(nil)
|
||||||
if query.Type != nil {
|
if query.Type != nil {
|
||||||
|
// Check if ndx is within the bounds of TypeIds slice
|
||||||
|
if ndx < len(query.TypeIds) {
|
||||||
id = new(string)
|
id = new(string)
|
||||||
*id = query.TypeIds[ndx]
|
*id = query.TypeIds[ndx]
|
||||||
|
} else {
|
||||||
|
// Log the error but continue processing
|
||||||
|
cclog.Warnf("TypeIds index out of range: %d with length %d for metric %s on host %s",
|
||||||
|
ndx, len(query.TypeIds), query.Metric, query.Hostname)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
sanitizeStats(&res)
|
sanitizeStats(&res)
|
||||||
|
|||||||
@@ -61,6 +61,9 @@
|
|||||||
}
|
}
|
||||||
return colbase
|
return colbase
|
||||||
})
|
})
|
||||||
|
const sortedRows = $derived(
|
||||||
|
$stats.data ? sort($stats.data.rows, sorting, nameFilter) : []
|
||||||
|
);
|
||||||
|
|
||||||
let stats = $derived(
|
let stats = $derived(
|
||||||
queryStore({
|
queryStore({
|
||||||
@@ -87,6 +90,40 @@
|
|||||||
);
|
);
|
||||||
|
|
||||||
/* Functions */
|
/* Functions */
|
||||||
|
function exportCsv() {
|
||||||
|
const isUser = type === "USER";
|
||||||
|
const header = [
|
||||||
|
isUser ? "Username" : "Project",
|
||||||
|
...(isUser ? ["Name"] : []),
|
||||||
|
"Total Jobs",
|
||||||
|
"Short Jobs",
|
||||||
|
...(fetchRunning ? ["Total Cores", "Total Accelerators"] : []),
|
||||||
|
"Total Walltime",
|
||||||
|
"Total Core Hours",
|
||||||
|
"Total Accelerator Hours",
|
||||||
|
];
|
||||||
|
const rows = sortedRows.map((row) => [
|
||||||
|
row.id,
|
||||||
|
...(isUser ? [row?.name ?? ""] : []),
|
||||||
|
row.totalJobs,
|
||||||
|
row.shortJobs,
|
||||||
|
...(fetchRunning ? [row.totalCores, row.totalAccs] : []),
|
||||||
|
row.totalWalltime,
|
||||||
|
row.totalCoreHours,
|
||||||
|
row.totalAccHours,
|
||||||
|
]);
|
||||||
|
const csv = [header, ...rows]
|
||||||
|
.map((row) => row.map((v) => `"${String(v ?? "").replace(/"/g, '""')}"`).join(","))
|
||||||
|
.join("\n");
|
||||||
|
const blob = new Blob([csv], { type: "text/csv" });
|
||||||
|
const url = URL.createObjectURL(blob);
|
||||||
|
const a = document.createElement("a");
|
||||||
|
a.href = url;
|
||||||
|
a.download = `${type.toLowerCase()}s.csv`;
|
||||||
|
a.click();
|
||||||
|
URL.revokeObjectURL(url);
|
||||||
|
}
|
||||||
|
|
||||||
function changeSorting(newField) {
|
function changeSorting(newField) {
|
||||||
if (sorting.field == newField) {
|
if (sorting.field == newField) {
|
||||||
// Same Field, Change Direction
|
// Same Field, Change Direction
|
||||||
@@ -137,6 +174,14 @@
|
|||||||
PROJECT: 'project',
|
PROJECT: 'project',
|
||||||
}[type]}"
|
}[type]}"
|
||||||
/>
|
/>
|
||||||
|
<Button
|
||||||
|
color="success"
|
||||||
|
title="Export current view as CSV"
|
||||||
|
disabled={!$stats.data}
|
||||||
|
onclick={() => exportCsv()}
|
||||||
|
>
|
||||||
|
<Icon name="download" /> CSV
|
||||||
|
</Button>
|
||||||
</InputGroup>
|
</InputGroup>
|
||||||
</Col>
|
</Col>
|
||||||
<Col xs="12" md="7" lg="8" xl="9">
|
<Col xs="12" md="7" lg="8" xl="9">
|
||||||
|
|||||||
@@ -60,6 +60,7 @@ export function init(extraInitQuery = "") {
|
|||||||
topology {
|
topology {
|
||||||
node
|
node
|
||||||
socket
|
socket
|
||||||
|
memoryDomain
|
||||||
core
|
core
|
||||||
accelerators { id }
|
accelerators { id }
|
||||||
}
|
}
|
||||||
@@ -238,7 +239,7 @@ export function groupByScope(jobMetrics) {
|
|||||||
const scopeGranularity = {
|
const scopeGranularity = {
|
||||||
node: 10,
|
node: 10,
|
||||||
socket: 5,
|
socket: 5,
|
||||||
memorydomain: 4,
|
memoryDomain: 4,
|
||||||
core: 3,
|
core: 3,
|
||||||
hwthread: 2,
|
hwthread: 2,
|
||||||
accelerator: 1
|
accelerator: 1
|
||||||
|
|||||||
@@ -107,6 +107,12 @@
|
|||||||
return pendingTableData
|
return pendingTableData
|
||||||
});
|
});
|
||||||
|
|
||||||
|
const refinedStateData = $derived.by(() => {
|
||||||
|
return $statusQuery?.data?.nodeStates.
|
||||||
|
filter((e) => ['allocated', 'reserved', 'idle', 'mixed','down', 'unknown'].includes(e.state)).
|
||||||
|
sort((a, b) => b.count - a.count)
|
||||||
|
});
|
||||||
|
|
||||||
const refinedHealthData = $derived.by(() => {
|
const refinedHealthData = $derived.by(() => {
|
||||||
return $statusQuery?.data?.nodeStates.
|
return $statusQuery?.data?.nodeStates.
|
||||||
filter((e) => ['full', 'partial', 'failed'].includes(e.state)).
|
filter((e) => ['full', 'partial', 'failed'].includes(e.state)).
|
||||||
|
|||||||
Reference in New Issue
Block a user