Merge branch 'dev' of github.com:ClusterCockpit/cc-backend into dev

This commit is contained in:
2026-03-03 15:41:51 +01:00
34 changed files with 2516 additions and 1257 deletions

View File

@@ -10,7 +10,6 @@ import (
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"strconv"
"strings"
@@ -90,16 +89,17 @@ func freeMetrics(rw http.ResponseWriter, r *http.Request) {
// @security ApiKeyAuth
// @router /write/ [post]
func writeMetrics(rw http.ResponseWriter, r *http.Request) {
bytes, err := io.ReadAll(r.Body)
rw.Header().Add("Content-Type", "application/json")
if err != nil {
handleError(err, http.StatusInternalServerError, rw)
return
}
// Extract the "cluster" query parameter without allocating a url.Values map.
cluster := queryParam(r.URL.RawQuery, "cluster")
// Stream directly from the request body instead of copying it into a
// temporary buffer via io.ReadAll. The line-protocol decoder supports
// io.Reader natively, so this avoids the largest heap allocation.
ms := metricstore.GetMemoryStore()
dec := lineprotocol.NewDecoderWithBytes(bytes)
if err := metricstore.DecodeLine(dec, ms, r.URL.Query().Get("cluster")); err != nil {
dec := lineprotocol.NewDecoder(r.Body)
if err := metricstore.DecodeLine(dec, ms, cluster); err != nil {
cclog.Errorf("/api/write error: %s", err.Error())
handleError(err, http.StatusBadRequest, rw)
return
@@ -107,6 +107,20 @@ func writeMetrics(rw http.ResponseWriter, r *http.Request) {
rw.WriteHeader(http.StatusOK)
}
// queryParam extracts a single query-parameter value from a raw query string
// without allocating a url.Values map. Returns "" if the key is not present.
func queryParam(raw, key string) string {
for raw != "" {
var kv string
kv, raw, _ = strings.Cut(raw, "&")
k, v, _ := strings.Cut(kv, "=")
if k == key {
return v
}
}
return ""
}
// handleDebug godoc
// @summary Debug endpoint
// @tags debug

View File

@@ -302,7 +302,7 @@ func (api *RestAPI) runTagger(rw http.ResponseWriter, r *http.Request) {
rw.Header().Set("Content-Type", "text/plain")
rw.WriteHeader(http.StatusOK)
if _, err := rw.Write([]byte(fmt.Sprintf("Tagger %s started", name))); err != nil {
if _, err := rw.Write(fmt.Appendf(nil, "Tagger %s started", name)); err != nil {
cclog.Errorf("Failed to write response: %v", err)
}
}

View File

@@ -74,11 +74,11 @@ func Init(rawConfig json.RawMessage) error {
dec := json.NewDecoder(bytes.NewReader(rawConfig))
dec.DisallowUnknownFields()
if err := dec.Decode(&configs); err != nil {
return fmt.Errorf("[METRICDISPATCH]> Metric Store Config Init: Could not decode config file '%s' Error: %s", rawConfig, err.Error())
return fmt.Errorf("[METRICDISPATCH]> External Metric Store Config Init: Could not decode config file '%s' Error: %s", rawConfig, err.Error())
}
if len(configs) == 0 {
return fmt.Errorf("[METRICDISPATCH]> No metric store configurations found in config file")
return fmt.Errorf("[METRICDISPATCH]> No external metric store configurations found in config file")
}
for _, config := range configs {

View File

@@ -126,6 +126,7 @@ func (ccms *CCMetricStore) buildQueries(
hwthreads = topology.Node
}
// Note: Expected exceptions will return as empty slices -> Continue
hostQueries, hostScopes := buildScopeQueries(
nativeScope, requestedScope,
remoteName, host.Hostname,
@@ -133,8 +134,9 @@ func (ccms *CCMetricStore) buildQueries(
resolution,
)
if len(hostQueries) == 0 && len(hostScopes) == 0 {
return nil, nil, fmt.Errorf("METRICDATA/CCMS > TODO: unhandled case: native-scope=%s, requested-scope=%s", nativeScope, requestedScope)
// Note: Unexpected errors, such as unhandled cases, will return as nils -> Error
if hostQueries == nil && hostScopes == nil {
return nil, nil, fmt.Errorf("METRICDATA/EXTERNAL-CCMS > TODO: unhandled case: native-scope=%s, requested-scope=%s", nativeScope, requestedScope)
}
queries = append(queries, hostQueries...)
@@ -237,7 +239,7 @@ func (ccms *CCMetricStore) buildNodeQueries(
)
if len(nodeQueries) == 0 && len(nodeScopes) == 0 {
return nil, nil, fmt.Errorf("METRICDATA/CCMS > TODO: unhandled case: native-scope=%s, requested-scope=%s", nativeScope, requestedScope)
return nil, nil, fmt.Errorf("METRICDATA/EXTERNAL-CCMS > TODO: unhandled case: native-scope=%s, requested-scope=%s", nativeScope, requestedScope)
}
queries = append(queries, nodeQueries...)
@@ -269,6 +271,7 @@ func buildScopeQueries(
// Accelerator -> Accelerator (Use "accelerator" scope if requested scope is lower than node)
if nativeScope == schema.MetricScopeAccelerator && scope.LT(schema.MetricScopeNode) {
if scope != schema.MetricScopeAccelerator {
// Expected Exception -> Continue -> Return Empty Slices
return queries, scopes
}
@@ -287,6 +290,7 @@ func buildScopeQueries(
// Accelerator -> Node
if nativeScope == schema.MetricScopeAccelerator && scope == schema.MetricScopeNode {
if len(accelerators) == 0 {
// Expected Exception -> Continue -> Return Empty Slices
return queries, scopes
}
@@ -447,6 +451,7 @@ func buildScopeQueries(
socketToDomains, err := topology.GetMemoryDomainsBySocket(memDomains)
if err != nil {
cclog.Errorf("Error mapping memory domains to sockets, return unchanged: %v", err)
// Rare Error Case -> Still Continue -> Return Empty Slices
return queries, scopes
}
@@ -507,8 +512,8 @@ func buildScopeQueries(
return queries, scopes
}
// Unhandled case - return empty slices
return queries, scopes
// Unhandled Case -> Error -> Return nils
return nil, nil
}
// intToStringSlice converts a slice of integers to a slice of strings.

View File

@@ -123,7 +123,7 @@ type APIMetricData struct {
Max schema.Float `json:"max"` // Maximum value in time range
}
// NewCCMetricStore creates and initializes a new CCMetricStore client.
// NewCCMetricStore creates and initializes a new (external) CCMetricStore client.
// The url parameter should include the protocol and port (e.g., "http://localhost:8080").
// The token parameter is a JWT used for Bearer authentication; pass empty string if auth is disabled.
func NewCCMetricStore(url string, token string) *CCMetricStore {
@@ -356,7 +356,7 @@ func (ccms *CCMetricStore) LoadData(
if len(errors) != 0 {
/* Returns list for "partial errors" */
return jobData, fmt.Errorf("METRICDATA/CCMS > Errors: %s", strings.Join(errors, ", "))
return jobData, fmt.Errorf("METRICDATA/EXTERNAL-CCMS > Errors: %s", strings.Join(errors, ", "))
}
return jobData, nil
}
@@ -393,6 +393,10 @@ func (ccms *CCMetricStore) LoadStats(
stats := make(map[string]map[string]schema.MetricStatistics, len(metrics))
for i, res := range resBody.Results {
if len(res) == 0 {
// No Data Found For Metric, Logged in FetchData to Warn
continue
}
query := req.Queries[i]
metric := query.Metric
data := res[0]
@@ -514,7 +518,7 @@ func (ccms *CCMetricStore) LoadScopedStats(
if len(errors) != 0 {
/* Returns list for "partial errors" */
return scopedJobStats, fmt.Errorf("METRICDATA/CCMS > Errors: %s", strings.Join(errors, ", "))
return scopedJobStats, fmt.Errorf("METRICDATA/EXTERNAL-CCMS > Errors: %s", strings.Join(errors, ", "))
}
return scopedJobStats, nil
}
@@ -562,6 +566,11 @@ func (ccms *CCMetricStore) LoadNodeData(
var errors []string
data := make(map[string]map[string][]*schema.JobMetric)
for i, res := range resBody.Results {
if len(res) == 0 {
// No Data Found For Metric, Logged in FetchData to Warn
continue
}
var query APIQuery
if resBody.Queries != nil {
query = resBody.Queries[i]
@@ -572,7 +581,6 @@ func (ccms *CCMetricStore) LoadNodeData(
metric := query.Metric
qdata := res[0]
if qdata.Error != nil {
/* Build list for "partial errors", if any */
errors = append(errors, fmt.Sprintf("fetching %s for node %s failed: %s", metric, query.Hostname, *qdata.Error))
}
@@ -604,7 +612,7 @@ func (ccms *CCMetricStore) LoadNodeData(
if len(errors) != 0 {
/* Returns list of "partial errors" */
return data, fmt.Errorf("METRICDATA/CCMS > Errors: %s", strings.Join(errors, ", "))
return data, fmt.Errorf("METRICDATA/EXTERNAL-CCMS > Errors: %s", strings.Join(errors, ", "))
}
return data, nil
@@ -765,7 +773,7 @@ func (ccms *CCMetricStore) LoadNodeListData(
if len(errors) != 0 {
/* Returns list of "partial errors" */
return data, fmt.Errorf("METRICDATA/CCMS > Errors: %s", strings.Join(errors, ", "))
return data, fmt.Errorf("METRICDATA/EXTERNAL-CCMS > Errors: %s", strings.Join(errors, ", "))
}
return data, nil

View File

@@ -64,9 +64,11 @@ func (t *AppTagger) scanApp(f *os.File, fns string) {
if line == "" {
continue
}
re, err := regexp.Compile(line)
// Wrap pattern to skip comment lines: match only if not preceded by # on the same line
wrapped := `(?m)^[^#]*` + line
re, err := regexp.Compile(wrapped)
if err != nil {
cclog.Errorf("invalid regex pattern '%s' in %s: %v", line, fns, err)
cclog.Errorf("invalid regex pattern '%s' (wrapped: '%s') in %s: %v", line, wrapped, fns, err)
continue
}
ai.patterns = append(ai.patterns, re)