mirror of
https://github.com/ClusterCockpit/cc-backend
synced 2026-04-15 20:07:30 +02:00
Merge branch 'dev' of github.com:ClusterCockpit/cc-backend into dev
This commit is contained in:
@@ -697,7 +697,15 @@ func (api *RestAPI) startJob(rw http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
}
|
||||
|
||||
id, err := api.JobRepository.Start(&req)
|
||||
// When tags are present, insert directly into the job table so that the
|
||||
// returned ID can be used with AddTagOrCreate (which queries the job table).
|
||||
// Jobs without tags use the cache path as before.
|
||||
var id int64
|
||||
if len(req.Tags) > 0 {
|
||||
id, err = api.JobRepository.StartDirect(&req)
|
||||
} else {
|
||||
id, err = api.JobRepository.Start(&req)
|
||||
}
|
||||
if err != nil {
|
||||
handleError(fmt.Errorf("insert into database failed: %w", err), http.StatusInternalServerError, rw)
|
||||
return
|
||||
|
||||
165
internal/api/log.go
Normal file
165
internal/api/log.go
Normal file
@@ -0,0 +1,165 @@
|
||||
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
|
||||
// All rights reserved. This file is part of cc-backend.
|
||||
// Use of this source code is governed by a MIT-style
|
||||
// license that can be found in the LICENSE file.
|
||||
package api
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"os/exec"
|
||||
"regexp"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
||||
"github.com/ClusterCockpit/cc-backend/internal/config"
|
||||
"github.com/ClusterCockpit/cc-backend/internal/repository"
|
||||
cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger"
|
||||
"github.com/ClusterCockpit/cc-lib/v2/schema"
|
||||
)
|
||||
|
||||
type LogEntry struct {
|
||||
Timestamp string `json:"timestamp"`
|
||||
Priority int `json:"priority"`
|
||||
Message string `json:"message"`
|
||||
Unit string `json:"unit"`
|
||||
}
|
||||
|
||||
var safePattern = regexp.MustCompile(`^[a-zA-Z0-9 :\-\.]+$`)
|
||||
|
||||
func (api *RestAPI) getJournalLog(rw http.ResponseWriter, r *http.Request) {
|
||||
user := repository.GetUserFromContext(r.Context())
|
||||
if !user.HasRole(schema.RoleAdmin) {
|
||||
handleError(fmt.Errorf("only admins are allowed to view logs"), http.StatusForbidden, rw)
|
||||
return
|
||||
}
|
||||
|
||||
since := r.URL.Query().Get("since")
|
||||
if since == "" {
|
||||
since = "1 hour ago"
|
||||
}
|
||||
if !safePattern.MatchString(since) {
|
||||
handleError(fmt.Errorf("invalid 'since' parameter"), http.StatusBadRequest, rw)
|
||||
return
|
||||
}
|
||||
|
||||
lines := 200
|
||||
if l := r.URL.Query().Get("lines"); l != "" {
|
||||
n, err := strconv.Atoi(l)
|
||||
if err != nil || n < 1 {
|
||||
handleError(fmt.Errorf("invalid 'lines' parameter"), http.StatusBadRequest, rw)
|
||||
return
|
||||
}
|
||||
if n > 1000 {
|
||||
n = 1000
|
||||
}
|
||||
lines = n
|
||||
}
|
||||
|
||||
unit := config.Keys.SystemdUnit
|
||||
if unit == "" {
|
||||
unit = "clustercockpit.service"
|
||||
}
|
||||
|
||||
args := []string{
|
||||
"--output=json",
|
||||
"--no-pager",
|
||||
"-n", fmt.Sprintf("%d", lines),
|
||||
"--since", since,
|
||||
"-u", unit,
|
||||
}
|
||||
|
||||
if level := r.URL.Query().Get("level"); level != "" {
|
||||
n, err := strconv.Atoi(level)
|
||||
if err != nil || n < 0 || n > 7 {
|
||||
handleError(fmt.Errorf("invalid 'level' parameter (must be 0-7)"), http.StatusBadRequest, rw)
|
||||
return
|
||||
}
|
||||
args = append(args, "--priority", fmt.Sprintf("%d", n))
|
||||
}
|
||||
|
||||
if search := r.URL.Query().Get("search"); search != "" {
|
||||
if !safePattern.MatchString(search) {
|
||||
handleError(fmt.Errorf("invalid 'search' parameter"), http.StatusBadRequest, rw)
|
||||
return
|
||||
}
|
||||
args = append(args, "--grep", search)
|
||||
}
|
||||
|
||||
cclog.Debugf("calling journalctl with %s", strings.Join(args, " "))
|
||||
cmd := exec.CommandContext(r.Context(), "journalctl", args...)
|
||||
stdout, err := cmd.StdoutPipe()
|
||||
if err != nil {
|
||||
handleError(fmt.Errorf("failed to create pipe: %w", err), http.StatusInternalServerError, rw)
|
||||
return
|
||||
}
|
||||
|
||||
if err := cmd.Start(); err != nil {
|
||||
handleError(fmt.Errorf("failed to start journalctl: %w", err), http.StatusInternalServerError, rw)
|
||||
return
|
||||
}
|
||||
|
||||
entries := make([]LogEntry, 0, lines)
|
||||
scanner := bufio.NewScanner(stdout)
|
||||
for scanner.Scan() {
|
||||
var raw map[string]any
|
||||
if err := json.Unmarshal(scanner.Bytes(), &raw); err != nil {
|
||||
cclog.Debugf("error unmarshal log output: %v", err)
|
||||
continue
|
||||
}
|
||||
|
||||
priority := 6 // default info
|
||||
if p, ok := raw["PRIORITY"]; ok {
|
||||
switch v := p.(type) {
|
||||
case string:
|
||||
if n, err := strconv.Atoi(v); err == nil {
|
||||
priority = n
|
||||
}
|
||||
case float64:
|
||||
priority = int(v)
|
||||
}
|
||||
}
|
||||
|
||||
msg := ""
|
||||
if m, ok := raw["MESSAGE"]; ok {
|
||||
if s, ok := m.(string); ok {
|
||||
msg = s
|
||||
}
|
||||
}
|
||||
|
||||
ts := ""
|
||||
if t, ok := raw["__REALTIME_TIMESTAMP"]; ok {
|
||||
if s, ok := t.(string); ok {
|
||||
ts = s
|
||||
}
|
||||
}
|
||||
|
||||
unitName := ""
|
||||
if u, ok := raw["_SYSTEMD_UNIT"]; ok {
|
||||
if s, ok := u.(string); ok {
|
||||
unitName = s
|
||||
}
|
||||
}
|
||||
|
||||
entries = append(entries, LogEntry{
|
||||
Timestamp: ts,
|
||||
Priority: priority,
|
||||
Message: msg,
|
||||
Unit: unitName,
|
||||
})
|
||||
}
|
||||
|
||||
if err := cmd.Wait(); err != nil {
|
||||
// journalctl returns exit code 1 when --grep matches nothing
|
||||
if len(entries) == 0 {
|
||||
cclog.Debugf("journalctl exited with: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
rw.Header().Set("Content-Type", "application/json")
|
||||
if err := json.NewEncoder(rw).Encode(entries); err != nil {
|
||||
cclog.Errorf("Failed to encode log entries: %v", err)
|
||||
}
|
||||
}
|
||||
@@ -211,7 +211,14 @@ func (api *NatsAPI) handleStartJob(payload string) {
|
||||
}
|
||||
}
|
||||
|
||||
id, err := api.JobRepository.Start(&req)
|
||||
// When tags are present, insert directly into the job table so that the
|
||||
// returned ID can be used with AddTagOrCreate (which queries the job table).
|
||||
var id int64
|
||||
if len(req.Tags) > 0 {
|
||||
id, err = api.JobRepository.StartDirect(&req)
|
||||
} else {
|
||||
id, err = api.JobRepository.Start(&req)
|
||||
}
|
||||
if err != nil {
|
||||
cclog.Errorf("NATS start job: insert into database failed: %v", err)
|
||||
return
|
||||
|
||||
@@ -12,6 +12,7 @@ import (
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/ClusterCockpit/cc-backend/internal/metricdispatch"
|
||||
"github.com/ClusterCockpit/cc-backend/internal/repository"
|
||||
"github.com/ClusterCockpit/cc-backend/pkg/archive"
|
||||
"github.com/ClusterCockpit/cc-backend/pkg/metricstore"
|
||||
@@ -77,25 +78,37 @@ func (api *RestAPI) updateNodeStates(rw http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
requestReceived := time.Now().Unix()
|
||||
repo := repository.GetNodeRepository()
|
||||
ms := metricstore.GetMemoryStore()
|
||||
|
||||
m := make(map[string][]string)
|
||||
metricNames := make(map[string][]string)
|
||||
healthResults := make(map[string]metricstore.HealthCheckResult)
|
||||
|
||||
startMs := time.Now()
|
||||
|
||||
// Step 1: Build nodeList and metricList per subcluster
|
||||
for _, node := range req.Nodes {
|
||||
if sc, err := archive.GetSubClusterByNode(req.Cluster, node.Hostname); err == nil {
|
||||
m[sc] = append(m[sc], node.Hostname)
|
||||
}
|
||||
}
|
||||
|
||||
for sc, nl := range m {
|
||||
for sc := range m {
|
||||
if sc != "" {
|
||||
metricList := archive.GetMetricConfigSubCluster(req.Cluster, sc)
|
||||
metricNames := metricListToNames(metricList)
|
||||
if results, err := ms.HealthCheck(req.Cluster, nl, metricNames); err == nil {
|
||||
maps.Copy(healthResults, results)
|
||||
metricNames[sc] = metricListToNames(metricList)
|
||||
}
|
||||
}
|
||||
|
||||
// Step 2: Determine which metric store to query and perform health check
|
||||
healthRepo, err := metricdispatch.GetHealthCheckRepo(req.Cluster)
|
||||
if err != nil {
|
||||
cclog.Warnf("updateNodeStates: no metric store for cluster %s, skipping health check: %v", req.Cluster, err)
|
||||
} else {
|
||||
for sc, nl := range m {
|
||||
if sc != "" {
|
||||
if results, err := healthRepo.HealthCheck(req.Cluster, nl, metricNames[sc]); err == nil {
|
||||
maps.Copy(healthResults, results)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -158,6 +158,7 @@ func (api *RestAPI) MountConfigAPIRoutes(r chi.Router) {
|
||||
// MountFrontendAPIRoutes registers frontend-specific API endpoints.
|
||||
// These routes support JWT generation and user configuration updates with session authentication.
|
||||
func (api *RestAPI) MountFrontendAPIRoutes(r chi.Router) {
|
||||
r.Get("/logs/", api.getJournalLog)
|
||||
// Settings Frontend Uses SessionAuth
|
||||
if api.Authentication != nil {
|
||||
r.Get("/jwt/", api.getJWT)
|
||||
|
||||
@@ -49,7 +49,7 @@ func TestRateLimiterBehavior(t *testing.T) {
|
||||
limiter := getIPUserLimiter(ip, username)
|
||||
|
||||
// Should allow first 5 attempts
|
||||
for i := 0; i < 5; i++ {
|
||||
for i := range 5 {
|
||||
if !limiter.Allow() {
|
||||
t.Errorf("Request %d should be allowed within rate limit", i+1)
|
||||
}
|
||||
|
||||
@@ -9,6 +9,7 @@ import (
|
||||
"database/sql"
|
||||
"errors"
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"github.com/ClusterCockpit/cc-backend/internal/repository"
|
||||
cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger"
|
||||
@@ -80,11 +81,12 @@ func extractNameFromClaims(claims jwt.MapClaims) string {
|
||||
return ""
|
||||
}
|
||||
|
||||
name := fmt.Sprintf("%v", vals[0])
|
||||
var name strings.Builder
|
||||
name.WriteString(fmt.Sprintf("%v", vals[0]))
|
||||
for i := 1; i < len(vals); i++ {
|
||||
name += fmt.Sprintf(" %v", vals[i])
|
||||
name.WriteString(fmt.Sprintf(" %v", vals[i]))
|
||||
}
|
||||
return name
|
||||
return name.String()
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -72,14 +72,17 @@ type ProgramConfig struct {
|
||||
// If exists, will enable dynamic zoom in frontend metric plots using the configured values
|
||||
EnableResampling *ResampleConfig `json:"resampling"`
|
||||
|
||||
// Systemd unit name for log viewer (default: "clustercockpit")
|
||||
SystemdUnit string `json:"systemd-unit"`
|
||||
|
||||
// Node state retention configuration
|
||||
NodeStateRetention *NodeStateRetention `json:"nodestate-retention"`
|
||||
}
|
||||
|
||||
type NodeStateRetention struct {
|
||||
Policy string `json:"policy"` // "delete" or "parquet"
|
||||
Age int `json:"age"` // hours, default 24
|
||||
TargetKind string `json:"target-kind"` // "file" or "s3"
|
||||
Policy string `json:"policy"` // "delete" or "move"
|
||||
Age int `json:"age"` // hours, default 24
|
||||
TargetKind string `json:"target-kind"` // "file" or "s3"
|
||||
TargetPath string `json:"target-path"`
|
||||
TargetEndpoint string `json:"target-endpoint"`
|
||||
TargetBucket string `json:"target-bucket"`
|
||||
|
||||
@@ -77,24 +77,18 @@ var configSchema = `
|
||||
"type": "integer"
|
||||
},
|
||||
"emission-constant": {
|
||||
"description": ".",
|
||||
"description": "Energy mix CO2 emission constant [g/kWh]. If set, displays estimated CO2 emission for jobs.",
|
||||
"type": "integer"
|
||||
},
|
||||
"cron-frequency": {
|
||||
"description": "Frequency of cron job workers.",
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"duration-worker": {
|
||||
"description": "Duration Update Worker [Defaults to '5m']",
|
||||
"type": "string"
|
||||
},
|
||||
"footprint-worker": {
|
||||
"description": "Metric-Footprint Update Worker [Defaults to '10m']",
|
||||
"type": "string"
|
||||
}
|
||||
}
|
||||
"machine-state-dir": {
|
||||
"description": "Where to store MachineState files.",
|
||||
"type": "string"
|
||||
},
|
||||
"enable-resampling": {
|
||||
"systemd-unit": {
|
||||
"description": "Systemd unit name for log viewer (default: 'clustercockpit').",
|
||||
"type": "string"
|
||||
},
|
||||
"resampling": {
|
||||
"description": "Enable dynamic zoom in frontend metric plots.",
|
||||
"type": "object",
|
||||
"properties": {
|
||||
@@ -136,9 +130,9 @@ var configSchema = `
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"policy": {
|
||||
"description": "Retention policy: 'delete' to remove old rows, 'parquet' to archive then delete.",
|
||||
"description": "Retention policy: 'delete' to remove old rows, 'move' to archive to Parquet then delete.",
|
||||
"type": "string",
|
||||
"enum": ["delete", "parquet"]
|
||||
"enum": ["delete", "move"]
|
||||
},
|
||||
"age": {
|
||||
"description": "Retention age in hours (default: 24).",
|
||||
|
||||
@@ -287,6 +287,7 @@ type ComplexityRoot struct {
|
||||
Cluster func(childComplexity int) int
|
||||
CpusAllocated func(childComplexity int) int
|
||||
GpusAllocated func(childComplexity int) int
|
||||
HealthData func(childComplexity int) int
|
||||
HealthState func(childComplexity int) int
|
||||
Hostname func(childComplexity int) int
|
||||
ID func(childComplexity int) int
|
||||
@@ -347,6 +348,7 @@ type ComplexityRoot struct {
|
||||
NodeStates func(childComplexity int, filter []*model.NodeFilter) int
|
||||
NodeStatesTimed func(childComplexity int, filter []*model.NodeFilter, typeArg string) int
|
||||
Nodes func(childComplexity int, filter []*model.NodeFilter, order *model.OrderByInput) int
|
||||
NodesWithMeta func(childComplexity int, filter []*model.NodeFilter, order *model.OrderByInput) int
|
||||
RooflineHeatmap func(childComplexity int, filter []*model.JobFilter, rows int, cols int, minX float64, minY float64, maxX float64, maxY float64) int
|
||||
ScopedJobStats func(childComplexity int, id string, metrics []string, scopes []schema.MetricScope) int
|
||||
Tags func(childComplexity int) int
|
||||
@@ -369,7 +371,7 @@ type ComplexityRoot struct {
|
||||
Series struct {
|
||||
Data func(childComplexity int) int
|
||||
Hostname func(childComplexity int) int
|
||||
Id func(childComplexity int) int
|
||||
ID func(childComplexity int) int
|
||||
Statistics func(childComplexity int) int
|
||||
}
|
||||
|
||||
@@ -476,6 +478,7 @@ type NodeResolver interface {
|
||||
SchedulerState(ctx context.Context, obj *schema.Node) (schema.SchedulerState, error)
|
||||
HealthState(ctx context.Context, obj *schema.Node) (string, error)
|
||||
MetaData(ctx context.Context, obj *schema.Node) (any, error)
|
||||
HealthData(ctx context.Context, obj *schema.Node) (any, error)
|
||||
}
|
||||
type QueryResolver interface {
|
||||
Clusters(ctx context.Context) ([]*schema.Cluster, error)
|
||||
@@ -485,6 +488,7 @@ type QueryResolver interface {
|
||||
AllocatedNodes(ctx context.Context, cluster string) ([]*model.Count, error)
|
||||
Node(ctx context.Context, id string) (*schema.Node, error)
|
||||
Nodes(ctx context.Context, filter []*model.NodeFilter, order *model.OrderByInput) (*model.NodeStateResultList, error)
|
||||
NodesWithMeta(ctx context.Context, filter []*model.NodeFilter, order *model.OrderByInput) (*model.NodeStateResultList, error)
|
||||
NodeStates(ctx context.Context, filter []*model.NodeFilter) ([]*model.NodeStates, error)
|
||||
NodeStatesTimed(ctx context.Context, filter []*model.NodeFilter, typeArg string) ([]*model.NodeStatesTimed, error)
|
||||
Job(ctx context.Context, id string) (*schema.Job, error)
|
||||
@@ -1452,6 +1456,12 @@ func (e *executableSchema) Complexity(ctx context.Context, typeName, field strin
|
||||
}
|
||||
|
||||
return e.complexity.Node.GpusAllocated(childComplexity), true
|
||||
case "Node.healthData":
|
||||
if e.complexity.Node.HealthData == nil {
|
||||
break
|
||||
}
|
||||
|
||||
return e.complexity.Node.HealthData(childComplexity), true
|
||||
case "Node.healthState":
|
||||
if e.complexity.Node.HealthState == nil {
|
||||
break
|
||||
@@ -1785,6 +1795,17 @@ func (e *executableSchema) Complexity(ctx context.Context, typeName, field strin
|
||||
}
|
||||
|
||||
return e.complexity.Query.Nodes(childComplexity, args["filter"].([]*model.NodeFilter), args["order"].(*model.OrderByInput)), true
|
||||
case "Query.nodesWithMeta":
|
||||
if e.complexity.Query.NodesWithMeta == nil {
|
||||
break
|
||||
}
|
||||
|
||||
args, err := ec.field_Query_nodesWithMeta_args(ctx, rawArgs)
|
||||
if err != nil {
|
||||
return 0, false
|
||||
}
|
||||
|
||||
return e.complexity.Query.NodesWithMeta(childComplexity, args["filter"].([]*model.NodeFilter), args["order"].(*model.OrderByInput)), true
|
||||
case "Query.rooflineHeatmap":
|
||||
if e.complexity.Query.RooflineHeatmap == nil {
|
||||
break
|
||||
@@ -1882,11 +1903,11 @@ func (e *executableSchema) Complexity(ctx context.Context, typeName, field strin
|
||||
|
||||
return e.complexity.Series.Hostname(childComplexity), true
|
||||
case "Series.id":
|
||||
if e.complexity.Series.Id == nil {
|
||||
if e.complexity.Series.ID == nil {
|
||||
break
|
||||
}
|
||||
|
||||
return e.complexity.Series.Id(childComplexity), true
|
||||
return e.complexity.Series.ID(childComplexity), true
|
||||
case "Series.statistics":
|
||||
if e.complexity.Series.Statistics == nil {
|
||||
break
|
||||
@@ -2302,6 +2323,7 @@ type Node {
|
||||
schedulerState: SchedulerState!
|
||||
healthState: MonitoringState!
|
||||
metaData: Any
|
||||
healthData: Any
|
||||
}
|
||||
|
||||
type NodeStates {
|
||||
@@ -2611,6 +2633,7 @@ type Query {
|
||||
## Node Queries New
|
||||
node(id: ID!): Node
|
||||
nodes(filter: [NodeFilter!], order: OrderByInput): NodeStateResultList!
|
||||
nodesWithMeta(filter: [NodeFilter!], order: OrderByInput): NodeStateResultList!
|
||||
nodeStates(filter: [NodeFilter!]): [NodeStates!]!
|
||||
nodeStatesTimed(filter: [NodeFilter!], type: String!): [NodeStatesTimed!]!
|
||||
|
||||
@@ -3268,6 +3291,22 @@ func (ec *executionContext) field_Query_node_args(ctx context.Context, rawArgs m
|
||||
return args, nil
|
||||
}
|
||||
|
||||
func (ec *executionContext) field_Query_nodesWithMeta_args(ctx context.Context, rawArgs map[string]any) (map[string]any, error) {
|
||||
var err error
|
||||
args := map[string]any{}
|
||||
arg0, err := graphql.ProcessArgField(ctx, rawArgs, "filter", ec.unmarshalONodeFilter2ᚕᚖgithubᚗcomᚋClusterCockpitᚋccᚑbackendᚋinternalᚋgraphᚋmodelᚐNodeFilterᚄ)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
args["filter"] = arg0
|
||||
arg1, err := graphql.ProcessArgField(ctx, rawArgs, "order", ec.unmarshalOOrderByInput2ᚖgithubᚗcomᚋClusterCockpitᚋccᚑbackendᚋinternalᚋgraphᚋmodelᚐOrderByInput)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
args["order"] = arg1
|
||||
return args, nil
|
||||
}
|
||||
|
||||
func (ec *executionContext) field_Query_nodes_args(ctx context.Context, rawArgs map[string]any) (map[string]any, error) {
|
||||
var err error
|
||||
args := map[string]any{}
|
||||
@@ -8258,6 +8297,35 @@ func (ec *executionContext) fieldContext_Node_metaData(_ context.Context, field
|
||||
return fc, nil
|
||||
}
|
||||
|
||||
func (ec *executionContext) _Node_healthData(ctx context.Context, field graphql.CollectedField, obj *schema.Node) (ret graphql.Marshaler) {
|
||||
return graphql.ResolveField(
|
||||
ctx,
|
||||
ec.OperationContext,
|
||||
field,
|
||||
ec.fieldContext_Node_healthData,
|
||||
func(ctx context.Context) (any, error) {
|
||||
return ec.resolvers.Node().HealthData(ctx, obj)
|
||||
},
|
||||
nil,
|
||||
ec.marshalOAny2interface,
|
||||
true,
|
||||
false,
|
||||
)
|
||||
}
|
||||
|
||||
func (ec *executionContext) fieldContext_Node_healthData(_ context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) {
|
||||
fc = &graphql.FieldContext{
|
||||
Object: "Node",
|
||||
Field: field,
|
||||
IsMethod: true,
|
||||
IsResolver: true,
|
||||
Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) {
|
||||
return nil, errors.New("field of type Any does not have child fields")
|
||||
},
|
||||
}
|
||||
return fc, nil
|
||||
}
|
||||
|
||||
func (ec *executionContext) _NodeMetrics_host(ctx context.Context, field graphql.CollectedField, obj *model.NodeMetrics) (ret graphql.Marshaler) {
|
||||
return graphql.ResolveField(
|
||||
ctx,
|
||||
@@ -8428,6 +8496,8 @@ func (ec *executionContext) fieldContext_NodeStateResultList_items(_ context.Con
|
||||
return ec.fieldContext_Node_healthState(ctx, field)
|
||||
case "metaData":
|
||||
return ec.fieldContext_Node_metaData(ctx, field)
|
||||
case "healthData":
|
||||
return ec.fieldContext_Node_healthData(ctx, field)
|
||||
}
|
||||
return nil, fmt.Errorf("no field named %q was found under type Node", field.Name)
|
||||
},
|
||||
@@ -9053,6 +9123,8 @@ func (ec *executionContext) fieldContext_Query_node(ctx context.Context, field g
|
||||
return ec.fieldContext_Node_healthState(ctx, field)
|
||||
case "metaData":
|
||||
return ec.fieldContext_Node_metaData(ctx, field)
|
||||
case "healthData":
|
||||
return ec.fieldContext_Node_healthData(ctx, field)
|
||||
}
|
||||
return nil, fmt.Errorf("no field named %q was found under type Node", field.Name)
|
||||
},
|
||||
@@ -9118,6 +9190,53 @@ func (ec *executionContext) fieldContext_Query_nodes(ctx context.Context, field
|
||||
return fc, nil
|
||||
}
|
||||
|
||||
func (ec *executionContext) _Query_nodesWithMeta(ctx context.Context, field graphql.CollectedField) (ret graphql.Marshaler) {
|
||||
return graphql.ResolveField(
|
||||
ctx,
|
||||
ec.OperationContext,
|
||||
field,
|
||||
ec.fieldContext_Query_nodesWithMeta,
|
||||
func(ctx context.Context) (any, error) {
|
||||
fc := graphql.GetFieldContext(ctx)
|
||||
return ec.resolvers.Query().NodesWithMeta(ctx, fc.Args["filter"].([]*model.NodeFilter), fc.Args["order"].(*model.OrderByInput))
|
||||
},
|
||||
nil,
|
||||
ec.marshalNNodeStateResultList2ᚖgithubᚗcomᚋClusterCockpitᚋccᚑbackendᚋinternalᚋgraphᚋmodelᚐNodeStateResultList,
|
||||
true,
|
||||
true,
|
||||
)
|
||||
}
|
||||
|
||||
func (ec *executionContext) fieldContext_Query_nodesWithMeta(ctx context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) {
|
||||
fc = &graphql.FieldContext{
|
||||
Object: "Query",
|
||||
Field: field,
|
||||
IsMethod: true,
|
||||
IsResolver: true,
|
||||
Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) {
|
||||
switch field.Name {
|
||||
case "items":
|
||||
return ec.fieldContext_NodeStateResultList_items(ctx, field)
|
||||
case "count":
|
||||
return ec.fieldContext_NodeStateResultList_count(ctx, field)
|
||||
}
|
||||
return nil, fmt.Errorf("no field named %q was found under type NodeStateResultList", field.Name)
|
||||
},
|
||||
}
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
err = ec.Recover(ctx, r)
|
||||
ec.Error(ctx, err)
|
||||
}
|
||||
}()
|
||||
ctx = graphql.WithFieldContext(ctx, fc)
|
||||
if fc.Args, err = ec.field_Query_nodesWithMeta_args(ctx, field.ArgumentMap(ec.Variables)); err != nil {
|
||||
ec.Error(ctx, err)
|
||||
return fc, err
|
||||
}
|
||||
return fc, nil
|
||||
}
|
||||
|
||||
func (ec *executionContext) _Query_nodeStates(ctx context.Context, field graphql.CollectedField) (ret graphql.Marshaler) {
|
||||
return graphql.ResolveField(
|
||||
ctx,
|
||||
@@ -15744,6 +15863,39 @@ func (ec *executionContext) _Node(ctx context.Context, sel ast.SelectionSet, obj
|
||||
continue
|
||||
}
|
||||
|
||||
out.Concurrently(i, func(ctx context.Context) graphql.Marshaler { return innerFunc(ctx, out) })
|
||||
case "healthData":
|
||||
field := field
|
||||
|
||||
innerFunc := func(ctx context.Context, _ *graphql.FieldSet) (res graphql.Marshaler) {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
ec.Error(ctx, ec.Recover(ctx, r))
|
||||
}
|
||||
}()
|
||||
res = ec._Node_healthData(ctx, field, obj)
|
||||
return res
|
||||
}
|
||||
|
||||
if field.Deferrable != nil {
|
||||
dfs, ok := deferred[field.Deferrable.Label]
|
||||
di := 0
|
||||
if ok {
|
||||
dfs.AddField(field)
|
||||
di = len(dfs.Values) - 1
|
||||
} else {
|
||||
dfs = graphql.NewFieldSet([]graphql.CollectedField{field})
|
||||
deferred[field.Deferrable.Label] = dfs
|
||||
}
|
||||
dfs.Concurrently(di, func(ctx context.Context) graphql.Marshaler {
|
||||
return innerFunc(ctx, dfs)
|
||||
})
|
||||
|
||||
// don't run the out.Concurrently() call below
|
||||
out.Values[i] = graphql.Null
|
||||
continue
|
||||
}
|
||||
|
||||
out.Concurrently(i, func(ctx context.Context) graphql.Marshaler { return innerFunc(ctx, out) })
|
||||
default:
|
||||
panic("unknown field " + strconv.Quote(field.Name))
|
||||
@@ -16171,6 +16323,28 @@ func (ec *executionContext) _Query(ctx context.Context, sel ast.SelectionSet) gr
|
||||
func(ctx context.Context) graphql.Marshaler { return innerFunc(ctx, out) })
|
||||
}
|
||||
|
||||
out.Concurrently(i, func(ctx context.Context) graphql.Marshaler { return rrm(innerCtx) })
|
||||
case "nodesWithMeta":
|
||||
field := field
|
||||
|
||||
innerFunc := func(ctx context.Context, fs *graphql.FieldSet) (res graphql.Marshaler) {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
ec.Error(ctx, ec.Recover(ctx, r))
|
||||
}
|
||||
}()
|
||||
res = ec._Query_nodesWithMeta(ctx, field)
|
||||
if res == graphql.Null {
|
||||
atomic.AddUint32(&fs.Invalids, 1)
|
||||
}
|
||||
return res
|
||||
}
|
||||
|
||||
rrm := func(ctx context.Context) graphql.Marshaler {
|
||||
return ec.OperationContext.RootResolverMiddleware(ctx,
|
||||
func(ctx context.Context) graphql.Marshaler { return innerFunc(ctx, out) })
|
||||
}
|
||||
|
||||
out.Concurrently(i, func(ctx context.Context) graphql.Marshaler { return rrm(innerCtx) })
|
||||
case "nodeStates":
|
||||
field := field
|
||||
|
||||
@@ -318,18 +318,39 @@ func (r *nodeResolver) SchedulerState(ctx context.Context, obj *schema.Node) (sc
|
||||
if obj.NodeState != "" {
|
||||
return obj.NodeState, nil
|
||||
} else {
|
||||
return "", fmt.Errorf("no SchedulerState (NodeState) on Object")
|
||||
return "", fmt.Errorf("resolver: no SchedulerState (NodeState) on node object")
|
||||
}
|
||||
}
|
||||
|
||||
// HealthState is the resolver for the healthState field.
|
||||
func (r *nodeResolver) HealthState(ctx context.Context, obj *schema.Node) (string, error) {
|
||||
panic(fmt.Errorf("not implemented: HealthState - healthState"))
|
||||
if obj.HealthState != "" {
|
||||
return string(obj.HealthState), nil
|
||||
} else {
|
||||
return "", fmt.Errorf("resolver: no HealthState (NodeState) on node object")
|
||||
}
|
||||
}
|
||||
|
||||
// MetaData is the resolver for the metaData field.
|
||||
func (r *nodeResolver) MetaData(ctx context.Context, obj *schema.Node) (any, error) {
|
||||
panic(fmt.Errorf("not implemented: MetaData - metaData"))
|
||||
if obj.MetaData != nil {
|
||||
return obj.MetaData, nil
|
||||
} else {
|
||||
cclog.Debug("resolver: no MetaData (NodeState) on node object")
|
||||
emptyMeta := make(map[string]string, 0)
|
||||
return emptyMeta, nil
|
||||
}
|
||||
}
|
||||
|
||||
// HealthData is the resolver for the healthData field.
|
||||
func (r *nodeResolver) HealthData(ctx context.Context, obj *schema.Node) (any, error) {
|
||||
if obj.HealthData != nil {
|
||||
return obj.HealthData, nil
|
||||
} else {
|
||||
cclog.Debug("resolver: no HealthData (NodeState) on node object")
|
||||
emptyHealth := make(map[string][]string, 0)
|
||||
return emptyHealth, nil
|
||||
}
|
||||
}
|
||||
|
||||
// Clusters is the resolver for the clusters field.
|
||||
@@ -398,6 +419,15 @@ func (r *queryResolver) Nodes(ctx context.Context, filter []*model.NodeFilter, o
|
||||
return &model.NodeStateResultList{Items: nodes, Count: &count}, err
|
||||
}
|
||||
|
||||
// NodesWithMeta is the resolver for the nodesWithMeta field.
|
||||
func (r *queryResolver) NodesWithMeta(ctx context.Context, filter []*model.NodeFilter, order *model.OrderByInput) (*model.NodeStateResultList, error) {
|
||||
// Why Extra Handler? -> graphql.CollectAllFields(ctx) only returns toplevel fields (i.e.: items, count), and not subfields like item.metaData
|
||||
repo := repository.GetNodeRepository()
|
||||
nodes, err := repo.QueryNodesWithMeta(ctx, filter, nil, order) // Ignore Paging, Order Unused
|
||||
count := len(nodes)
|
||||
return &model.NodeStateResultList{Items: nodes, Count: &count}, err
|
||||
}
|
||||
|
||||
// NodeStates is the resolver for the nodeStates field.
|
||||
func (r *queryResolver) NodeStates(ctx context.Context, filter []*model.NodeFilter) ([]*model.NodeStates, error) {
|
||||
repo := repository.GetNodeRepository()
|
||||
|
||||
@@ -38,7 +38,7 @@ import (
|
||||
func HandleImportFlag(flag string) error {
|
||||
r := repository.GetJobRepository()
|
||||
|
||||
for _, pair := range strings.Split(flag, ",") {
|
||||
for pair := range strings.SplitSeq(flag, ",") {
|
||||
files := strings.Split(pair, ":")
|
||||
if len(files) != 2 {
|
||||
return fmt.Errorf("REPOSITORY/INIT > invalid import flag format")
|
||||
@@ -102,7 +102,7 @@ func HandleImportFlag(flag string) error {
|
||||
return err
|
||||
}
|
||||
|
||||
id, err := r.InsertJob(&job)
|
||||
id, err := r.InsertJobDirect(&job)
|
||||
if err != nil {
|
||||
cclog.Warn("Error while job db insert")
|
||||
return err
|
||||
|
||||
@@ -165,7 +165,7 @@ func TestHandleImportFlag(t *testing.T) {
|
||||
}
|
||||
|
||||
result := readResult(t, testname)
|
||||
job, err := r.FindCached(&result.JobId, &result.Cluster, &result.StartTime)
|
||||
job, err := r.Find(&result.JobId, &result.Cluster, &result.StartTime)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
@@ -52,6 +52,11 @@ type MetricDataRepository interface {
|
||||
resolution int,
|
||||
from, to time.Time,
|
||||
ctx context.Context) (map[string]schema.JobData, error)
|
||||
|
||||
// HealthCheck evaluates the monitoring state for a set of nodes against expected metrics.
|
||||
HealthCheck(cluster string,
|
||||
nodes []string,
|
||||
metrics []string) (map[string]metricstore.HealthCheckResult, error)
|
||||
}
|
||||
|
||||
type CCMetricStoreConfig struct {
|
||||
@@ -110,3 +115,9 @@ func GetMetricDataRepo(cluster string, subcluster string) (MetricDataRepository,
|
||||
|
||||
return repo, nil
|
||||
}
|
||||
|
||||
// GetHealthCheckRepo returns the MetricDataRepository for performing health checks on a cluster.
|
||||
// It uses the same fallback logic as GetMetricDataRepo: cluster → wildcard → internal.
|
||||
func GetHealthCheckRepo(cluster string) (MetricDataRepository, error) {
|
||||
return GetMetricDataRepo(cluster, "")
|
||||
}
|
||||
|
||||
@@ -63,6 +63,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/ClusterCockpit/cc-backend/pkg/archive"
|
||||
"github.com/ClusterCockpit/cc-backend/pkg/metricstore"
|
||||
cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger"
|
||||
"github.com/ClusterCockpit/cc-lib/v2/schema"
|
||||
)
|
||||
@@ -653,6 +654,54 @@ func (ccms *CCMetricStore) LoadNodeListData(
|
||||
return data, nil
|
||||
}
|
||||
|
||||
// HealthCheck queries the external cc-metric-store's health check endpoint.
|
||||
// It sends a HealthCheckReq as the request body to /api/healthcheck and
|
||||
// returns the per-node health check results.
|
||||
func (ccms *CCMetricStore) HealthCheck(cluster string,
|
||||
nodes []string, metrics []string,
|
||||
) (map[string]metricstore.HealthCheckResult, error) {
|
||||
req := metricstore.HealthCheckReq{
|
||||
Cluster: cluster,
|
||||
Nodes: nodes,
|
||||
MetricNames: metrics,
|
||||
}
|
||||
|
||||
buf := &bytes.Buffer{}
|
||||
if err := json.NewEncoder(buf).Encode(req); err != nil {
|
||||
cclog.Errorf("Error while encoding health check request body: %s", err.Error())
|
||||
return nil, err
|
||||
}
|
||||
|
||||
endpoint := fmt.Sprintf("%s/api/healthcheck", ccms.url)
|
||||
httpReq, err := http.NewRequest(http.MethodGet, endpoint, buf)
|
||||
if err != nil {
|
||||
cclog.Errorf("Error while building health check request: %s", err.Error())
|
||||
return nil, err
|
||||
}
|
||||
if ccms.jwt != "" {
|
||||
httpReq.Header.Add("Authorization", fmt.Sprintf("Bearer %s", ccms.jwt))
|
||||
}
|
||||
|
||||
res, err := ccms.client.Do(httpReq)
|
||||
if err != nil {
|
||||
cclog.Errorf("Error while performing health check request: %s", err.Error())
|
||||
return nil, err
|
||||
}
|
||||
defer res.Body.Close()
|
||||
|
||||
if res.StatusCode != http.StatusOK {
|
||||
return nil, fmt.Errorf("'%s': HTTP Status: %s", endpoint, res.Status)
|
||||
}
|
||||
|
||||
var results map[string]metricstore.HealthCheckResult
|
||||
if err := json.NewDecoder(bufio.NewReader(res.Body)).Decode(&results); err != nil {
|
||||
cclog.Errorf("Error while decoding health check response: %s", err.Error())
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return results, nil
|
||||
}
|
||||
|
||||
// sanitizeStats replaces NaN values in statistics with 0 to enable JSON marshaling.
|
||||
// Regular float64 values cannot be JSONed when NaN.
|
||||
func sanitizeStats(avg, min, max *schema.Float) {
|
||||
|
||||
@@ -844,6 +844,8 @@ func (r *JobRepository) FindJobsBetween(startTimeBegin int64, startTimeEnd int64
|
||||
query = query.Where("NOT EXISTS (SELECT 1 FROM jobtag WHERE jobtag.job_id = job.id)")
|
||||
}
|
||||
|
||||
query = query.OrderBy("job.cluster ASC", "job.subcluster ASC", "job.project ASC", "job.start_time ASC")
|
||||
|
||||
rows, err := query.RunWith(r.stmtCache).Query()
|
||||
if err != nil {
|
||||
cclog.Errorf("Error while running FindJobsBetween query: %v", err)
|
||||
|
||||
@@ -30,6 +30,27 @@ const NamedJobInsert string = `INSERT INTO job (
|
||||
:shared, :monitoring_status, :smt, :job_state, :start_time, :duration, :walltime, :footprint, :energy, :energy_footprint, :resources, :meta_data
|
||||
);`
|
||||
|
||||
// InsertJobDirect inserts a job directly into the job table (not job_cache).
|
||||
// Use this when the returned ID will be used for operations on the job table
|
||||
// (e.g., adding tags), or for imported jobs that are already completed.
|
||||
func (r *JobRepository) InsertJobDirect(job *schema.Job) (int64, error) {
|
||||
r.Mutex.Lock()
|
||||
defer r.Mutex.Unlock()
|
||||
|
||||
res, err := r.DB.NamedExec(NamedJobInsert, job)
|
||||
if err != nil {
|
||||
cclog.Warn("Error while NamedJobInsert (direct)")
|
||||
return 0, err
|
||||
}
|
||||
id, err := res.LastInsertId()
|
||||
if err != nil {
|
||||
cclog.Warn("Error while getting last insert ID (direct)")
|
||||
return 0, err
|
||||
}
|
||||
|
||||
return id, nil
|
||||
}
|
||||
|
||||
func (r *JobRepository) InsertJob(job *schema.Job) (int64, error) {
|
||||
r.Mutex.Lock()
|
||||
defer r.Mutex.Unlock()
|
||||
@@ -85,6 +106,22 @@ func (r *JobRepository) SyncJobs() ([]*schema.Job, error) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Resolve correct job.id from the job table. The IDs read from job_cache
|
||||
// are from a different auto-increment sequence and must not be used to
|
||||
// query the job table.
|
||||
for _, job := range jobs {
|
||||
var newID int64
|
||||
if err := sq.Select("job.id").From("job").
|
||||
Where("job.job_id = ? AND job.cluster = ? AND job.start_time = ?",
|
||||
job.JobID, job.Cluster, job.StartTime).
|
||||
RunWith(r.stmtCache).QueryRow().Scan(&newID); err != nil {
|
||||
cclog.Warnf("SyncJobs: could not resolve job table id for job %d on %s: %v",
|
||||
job.JobID, job.Cluster, err)
|
||||
continue
|
||||
}
|
||||
job.ID = &newID
|
||||
}
|
||||
|
||||
return jobs, nil
|
||||
}
|
||||
|
||||
@@ -132,6 +169,28 @@ func (r *JobRepository) Start(job *schema.Job) (id int64, err error) {
|
||||
return r.InsertJob(job)
|
||||
}
|
||||
|
||||
// StartDirect inserts a new job directly into the job table (not job_cache).
|
||||
// Use this when the returned ID will immediately be used for job table
|
||||
// operations such as adding tags.
|
||||
func (r *JobRepository) StartDirect(job *schema.Job) (id int64, err error) {
|
||||
job.RawFootprint, err = json.Marshal(job.Footprint)
|
||||
if err != nil {
|
||||
return -1, fmt.Errorf("REPOSITORY/JOB > encoding footprint field failed: %w", err)
|
||||
}
|
||||
|
||||
job.RawResources, err = json.Marshal(job.Resources)
|
||||
if err != nil {
|
||||
return -1, fmt.Errorf("REPOSITORY/JOB > encoding resources field failed: %w", err)
|
||||
}
|
||||
|
||||
job.RawMetaData, err = json.Marshal(job.MetaData)
|
||||
if err != nil {
|
||||
return -1, fmt.Errorf("REPOSITORY/JOB > encoding metaData field failed: %w", err)
|
||||
}
|
||||
|
||||
return r.InsertJobDirect(job)
|
||||
}
|
||||
|
||||
// Stop updates the job with the database id jobId using the provided arguments.
|
||||
func (r *JobRepository) Stop(
|
||||
jobID int64,
|
||||
|
||||
@@ -489,6 +489,34 @@ func TestSyncJobs(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
})
|
||||
|
||||
t.Run("sync returns job table IDs not cache IDs", func(t *testing.T) {
|
||||
// Ensure cache is empty first
|
||||
_, err := r.DB.Exec("DELETE FROM job_cache")
|
||||
require.NoError(t, err)
|
||||
|
||||
// Insert a job into job_cache
|
||||
job := createTestJob(999015, "testcluster")
|
||||
cacheID, err := r.Start(job)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Sync jobs
|
||||
jobs, err := r.SyncJobs()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 1, len(jobs))
|
||||
|
||||
// The returned ID must refer to the job table, not job_cache
|
||||
var jobTableID int64
|
||||
err = r.DB.QueryRow("SELECT id FROM job WHERE job_id = ? AND cluster = ? AND start_time = ?",
|
||||
jobs[0].JobID, jobs[0].Cluster, jobs[0].StartTime).Scan(&jobTableID)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, jobTableID, *jobs[0].ID,
|
||||
"returned ID should match the job table row, not the cache ID (%d)", cacheID)
|
||||
|
||||
// Clean up
|
||||
_, err = r.DB.Exec("DELETE FROM job WHERE job_id = ? AND cluster = ?", job.JobID, job.Cluster)
|
||||
require.NoError(t, err)
|
||||
})
|
||||
|
||||
t.Run("sync with empty cache returns empty list", func(t *testing.T) {
|
||||
// Ensure cache is empty
|
||||
_, err := r.DB.Exec("DELETE FROM job_cache")
|
||||
@@ -500,3 +528,80 @@ func TestSyncJobs(t *testing.T) {
|
||||
assert.Equal(t, 0, len(jobs), "Should return empty list when cache is empty")
|
||||
})
|
||||
}
|
||||
|
||||
func TestInsertJobDirect(t *testing.T) {
|
||||
r := setup(t)
|
||||
|
||||
t.Run("inserts into job table not cache", func(t *testing.T) {
|
||||
job := createTestJob(999020, "testcluster")
|
||||
job.RawResources, _ = json.Marshal(job.Resources)
|
||||
job.RawFootprint, _ = json.Marshal(job.Footprint)
|
||||
job.RawMetaData, _ = json.Marshal(job.MetaData)
|
||||
|
||||
id, err := r.InsertJobDirect(job)
|
||||
require.NoError(t, err, "InsertJobDirect should succeed")
|
||||
assert.Greater(t, id, int64(0), "Should return valid insert ID")
|
||||
|
||||
// Verify job is in job table
|
||||
var count int
|
||||
err = r.DB.QueryRow("SELECT COUNT(*) FROM job WHERE id = ?", id).Scan(&count)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, 1, count, "Job should be in job table")
|
||||
|
||||
// Verify job is NOT in job_cache
|
||||
err = r.DB.QueryRow("SELECT COUNT(*) FROM job_cache WHERE job_id = ? AND cluster = ?",
|
||||
job.JobID, job.Cluster).Scan(&count)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, 0, count, "Job should NOT be in job_cache")
|
||||
|
||||
// Clean up
|
||||
_, err = r.DB.Exec("DELETE FROM job WHERE id = ?", id)
|
||||
require.NoError(t, err)
|
||||
})
|
||||
|
||||
t.Run("returned ID works for tag operations", func(t *testing.T) {
|
||||
job := createTestJob(999021, "testcluster")
|
||||
job.RawResources, _ = json.Marshal(job.Resources)
|
||||
job.RawFootprint, _ = json.Marshal(job.Footprint)
|
||||
job.RawMetaData, _ = json.Marshal(job.MetaData)
|
||||
|
||||
id, err := r.InsertJobDirect(job)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Adding a tag using the returned ID should succeed (FK constraint on jobtag)
|
||||
err = r.ImportTag(id, "test_type", "test_name", "global")
|
||||
require.NoError(t, err, "ImportTag should succeed with direct insert ID")
|
||||
|
||||
// Clean up
|
||||
_, err = r.DB.Exec("DELETE FROM jobtag WHERE job_id = ?", id)
|
||||
require.NoError(t, err)
|
||||
_, err = r.DB.Exec("DELETE FROM job WHERE id = ?", id)
|
||||
require.NoError(t, err)
|
||||
})
|
||||
}
|
||||
|
||||
func TestStartDirect(t *testing.T) {
|
||||
r := setup(t)
|
||||
|
||||
t.Run("inserts into job table with JSON encoding", func(t *testing.T) {
|
||||
job := createTestJob(999022, "testcluster")
|
||||
|
||||
id, err := r.StartDirect(job)
|
||||
require.NoError(t, err, "StartDirect should succeed")
|
||||
assert.Greater(t, id, int64(0))
|
||||
|
||||
// Verify job is in job table with encoded JSON
|
||||
var rawResources []byte
|
||||
err = r.DB.QueryRow("SELECT resources FROM job WHERE id = ?", id).Scan(&rawResources)
|
||||
require.NoError(t, err)
|
||||
|
||||
var resources []*schema.Resource
|
||||
err = json.Unmarshal(rawResources, &resources)
|
||||
require.NoError(t, err, "Resources should be valid JSON")
|
||||
assert.Equal(t, "node01", resources[0].Hostname)
|
||||
|
||||
// Clean up
|
||||
_, err = r.DB.Exec("DELETE FROM job WHERE id = ?", id)
|
||||
require.NoError(t, err)
|
||||
})
|
||||
}
|
||||
|
||||
@@ -154,16 +154,14 @@ func (r *NodeRepository) GetNodeByID(id int64, withMeta bool) (*schema.Node, err
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// NEEDS METADATA BY ID
|
||||
// if withMeta {
|
||||
// var err error
|
||||
// var meta map[string]string
|
||||
// if meta, err = r.FetchMetadata(hostname, cluster); err != nil {
|
||||
// cclog.Warnf("Error while fetching metadata for node '%s'", hostname)
|
||||
// return nil, err
|
||||
// }
|
||||
// node.MetaData = meta
|
||||
// }
|
||||
if withMeta {
|
||||
meta, metaErr := r.FetchMetadata(node.Hostname, node.Cluster)
|
||||
if metaErr != nil {
|
||||
cclog.Warnf("Error while fetching metadata for node ID '%d': %v", id, metaErr)
|
||||
return nil, metaErr
|
||||
}
|
||||
node.MetaData = meta
|
||||
}
|
||||
|
||||
return node, nil
|
||||
}
|
||||
@@ -285,7 +283,7 @@ func (r *NodeRepository) FindNodeStatesBefore(cutoff int64) ([]NodeStateWithNode
|
||||
Join("node ON node_state.node_id = node.id").
|
||||
Where(sq.Lt{"node_state.time_stamp": cutoff}).
|
||||
Where("node_state.id NOT IN (SELECT ns2.id FROM node_state ns2 WHERE ns2.time_stamp = (SELECT MAX(ns3.time_stamp) FROM node_state ns3 WHERE ns3.node_id = ns2.node_id))").
|
||||
OrderBy("node_state.time_stamp ASC").
|
||||
OrderBy("node.cluster ASC", "node.subcluster ASC", "node.hostname ASC", "node_state.time_stamp ASC").
|
||||
RunWith(r.DB).Query()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -295,13 +293,15 @@ func (r *NodeRepository) FindNodeStatesBefore(cutoff int64) ([]NodeStateWithNode
|
||||
var result []NodeStateWithNode
|
||||
for rows.Next() {
|
||||
var ns NodeStateWithNode
|
||||
var healthMetrics sql.NullString
|
||||
if err := rows.Scan(&ns.ID, &ns.TimeStamp, &ns.NodeState,
|
||||
&ns.HealthState, &ns.HealthMetrics,
|
||||
&ns.HealthState, &healthMetrics,
|
||||
&ns.CpusAllocated, &ns.MemoryAllocated,
|
||||
&ns.GpusAllocated, &ns.JobsRunning,
|
||||
&ns.Hostname, &ns.Cluster, &ns.SubCluster); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
ns.HealthMetrics = healthMetrics.String
|
||||
result = append(result, ns)
|
||||
}
|
||||
return result, nil
|
||||
@@ -382,6 +382,81 @@ func (r *NodeRepository) QueryNodes(
|
||||
return nodes, nil
|
||||
}
|
||||
|
||||
// QueryNodesWithMeta returns a list of nodes based on a node filter. It always operates
|
||||
// on the last state (largest timestamp). It includes both (!) optional JSON column data
|
||||
func (r *NodeRepository) QueryNodesWithMeta(
|
||||
ctx context.Context,
|
||||
filters []*model.NodeFilter,
|
||||
page *model.PageRequest,
|
||||
order *model.OrderByInput, // Currently unused!
|
||||
) ([]*schema.Node, error) {
|
||||
query, qerr := AccessCheck(ctx,
|
||||
sq.Select("node.hostname", "node.cluster", "node.subcluster",
|
||||
"node_state.node_state", "node_state.health_state",
|
||||
"node.meta_data", "node_state.health_metrics").
|
||||
From("node").
|
||||
Join("node_state ON node_state.node_id = node.id").
|
||||
Where(latestStateCondition()))
|
||||
if qerr != nil {
|
||||
return nil, qerr
|
||||
}
|
||||
|
||||
query = applyNodeFilters(query, filters)
|
||||
query = query.OrderBy("node.hostname ASC")
|
||||
|
||||
if page != nil && page.ItemsPerPage != -1 {
|
||||
limit := uint64(page.ItemsPerPage)
|
||||
query = query.Offset((uint64(page.Page) - 1) * limit).Limit(limit)
|
||||
}
|
||||
|
||||
rows, err := query.RunWith(r.stmtCache).Query()
|
||||
if err != nil {
|
||||
queryString, queryVars, _ := query.ToSql()
|
||||
cclog.Errorf("Error while running query '%s' %v: %v", queryString, queryVars, err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
nodes := make([]*schema.Node, 0)
|
||||
for rows.Next() {
|
||||
node := schema.Node{}
|
||||
RawMetaData := make([]byte, 0)
|
||||
RawMetricHealth := make([]byte, 0)
|
||||
|
||||
if err := rows.Scan(&node.Hostname, &node.Cluster, &node.SubCluster,
|
||||
&node.NodeState, &node.HealthState, &RawMetaData, &RawMetricHealth); err != nil {
|
||||
rows.Close()
|
||||
cclog.Warn("Error while scanning rows (QueryNodes)")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if len(RawMetaData) == 0 {
|
||||
node.MetaData = nil
|
||||
} else {
|
||||
metaData := make(map[string]string)
|
||||
if err := json.Unmarshal(RawMetaData, &metaData); err != nil {
|
||||
cclog.Warn("Error while unmarshaling raw metadata json")
|
||||
return nil, err
|
||||
}
|
||||
node.MetaData = metaData
|
||||
}
|
||||
|
||||
if len(RawMetricHealth) == 0 {
|
||||
node.HealthData = nil
|
||||
} else {
|
||||
healthData := make(map[string][]string)
|
||||
if err := json.Unmarshal(RawMetricHealth, &healthData); err != nil {
|
||||
cclog.Warn("Error while unmarshaling raw healthdata json")
|
||||
return nil, err
|
||||
}
|
||||
node.HealthData = healthData
|
||||
}
|
||||
|
||||
nodes = append(nodes, &node)
|
||||
}
|
||||
|
||||
return nodes, nil
|
||||
}
|
||||
|
||||
// CountNodes returns the total matched nodes based on a node filter. It always operates
|
||||
// on the last state (largest timestamp) per node.
|
||||
func (r *NodeRepository) CountNodes(
|
||||
|
||||
@@ -62,7 +62,7 @@ func (r *JobRepository) TransactionEnd(t *Transaction) error {
|
||||
func (r *JobRepository) TransactionAddNamed(
|
||||
t *Transaction,
|
||||
query string,
|
||||
args ...interface{},
|
||||
args ...any,
|
||||
) (int64, error) {
|
||||
if t.tx == nil {
|
||||
return 0, fmt.Errorf("transaction is nil or already completed")
|
||||
@@ -82,7 +82,7 @@ func (r *JobRepository) TransactionAddNamed(
|
||||
}
|
||||
|
||||
// TransactionAdd executes a query within the transaction.
|
||||
func (r *JobRepository) TransactionAdd(t *Transaction, query string, args ...interface{}) (int64, error) {
|
||||
func (r *JobRepository) TransactionAdd(t *Transaction, query string, args ...any) (int64, error) {
|
||||
if t.tx == nil {
|
||||
return 0, fmt.Errorf("transaction is nil or already completed")
|
||||
}
|
||||
|
||||
@@ -189,7 +189,7 @@ func TestTransactionAddNamed(t *testing.T) {
|
||||
tx := &Transaction{tx: nil}
|
||||
|
||||
_, err := r.TransactionAddNamed(tx, "INSERT INTO tag (tag_type, tag_name, tag_scope) VALUES (:type, :name, :scope)",
|
||||
map[string]interface{}{"type": "test", "name": "test", "scope": "global"})
|
||||
map[string]any{"type": "test", "name": "test", "scope": "global"})
|
||||
assert.Error(t, err, "Should error on nil transaction")
|
||||
assert.Contains(t, err.Error(), "transaction is nil or already completed")
|
||||
})
|
||||
@@ -204,7 +204,7 @@ func TestTransactionMultipleOperations(t *testing.T) {
|
||||
defer tx.Rollback()
|
||||
|
||||
// Insert multiple tags
|
||||
for i := 0; i < 5; i++ {
|
||||
for i := range 5 {
|
||||
_, err = r.TransactionAdd(tx,
|
||||
"INSERT INTO tag (tag_type, tag_name, tag_scope) VALUES (?, ?, ?)",
|
||||
"test_type", "test_multi_"+string(rune('a'+i)), "global")
|
||||
@@ -230,7 +230,7 @@ func TestTransactionMultipleOperations(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
|
||||
// Insert multiple tags
|
||||
for i := 0; i < 3; i++ {
|
||||
for i := range 3 {
|
||||
_, err = r.TransactionAdd(tx,
|
||||
"INSERT INTO tag (tag_type, tag_name, tag_scope) VALUES (?, ?, ?)",
|
||||
"test_type", "test_rollback_"+string(rune('a'+i)), "global")
|
||||
|
||||
@@ -126,7 +126,7 @@ func (r *UserRepository) AddUser(user *schema.User) error {
|
||||
projectsJson, _ := json.Marshal(user.Projects)
|
||||
|
||||
cols := []string{"username", "roles", "projects"}
|
||||
vals := []interface{}{user.Username, string(rolesJson), string(projectsJson)}
|
||||
vals := []any{user.Username, string(rolesJson), string(projectsJson)}
|
||||
|
||||
if user.Name != "" {
|
||||
cols = append(cols, "name")
|
||||
@@ -392,7 +392,7 @@ func (r *UserRepository) RemoveProject(ctx context.Context, username string, pro
|
||||
}
|
||||
|
||||
if exists {
|
||||
var result interface{}
|
||||
var result any
|
||||
if len(newprojects) == 0 {
|
||||
result = "[]"
|
||||
} else {
|
||||
|
||||
@@ -23,7 +23,7 @@ import (
|
||||
"github.com/go-chi/chi/v5"
|
||||
)
|
||||
|
||||
type InfoType map[string]interface{}
|
||||
type InfoType map[string]any
|
||||
|
||||
type Route struct {
|
||||
Route string
|
||||
@@ -50,6 +50,7 @@ var routes []Route = []Route{
|
||||
{"/monitoring/status/{cluster}", "monitoring/status.tmpl", "<ID> Dashboard - ClusterCockpit", false, setupClusterStatusRoute},
|
||||
{"/monitoring/status/detail/{cluster}", "monitoring/status.tmpl", "Status of <ID> - ClusterCockpit", false, setupClusterDetailRoute},
|
||||
{"/monitoring/dashboard/{cluster}", "monitoring/dashboard.tmpl", "<ID> Dashboard - ClusterCockpit", false, setupDashboardRoute},
|
||||
{"/monitoring/logs", "monitoring/logs.tmpl", "Logs - ClusterCockpit", false, func(i InfoType, r *http.Request) InfoType { return i }},
|
||||
}
|
||||
|
||||
func setupHomeRoute(i InfoType, r *http.Request) InfoType {
|
||||
@@ -192,7 +193,7 @@ func setupAnalysisRoute(i InfoType, r *http.Request) InfoType {
|
||||
func setupTaglistRoute(i InfoType, r *http.Request) InfoType {
|
||||
jobRepo := repository.GetJobRepository()
|
||||
tags, counts, err := jobRepo.CountTags(repository.GetUserFromContext(r.Context()))
|
||||
tagMap := make(map[string][]map[string]interface{})
|
||||
tagMap := make(map[string][]map[string]any)
|
||||
if err != nil {
|
||||
cclog.Warnf("GetTags failed: %s", err.Error())
|
||||
i["tagmap"] = tagMap
|
||||
@@ -203,7 +204,7 @@ func setupTaglistRoute(i InfoType, r *http.Request) InfoType {
|
||||
// Uses tag.ID as second Map-Key component to differentiate tags with identical names
|
||||
if userAuthlevel >= 4 { // Support+ : Show tags for all scopes, regardless of count
|
||||
for _, tag := range tags {
|
||||
tagItem := map[string]interface{}{
|
||||
tagItem := map[string]any{
|
||||
"id": tag.ID,
|
||||
"name": tag.Name,
|
||||
"scope": tag.Scope,
|
||||
@@ -215,7 +216,7 @@ func setupTaglistRoute(i InfoType, r *http.Request) InfoType {
|
||||
for _, tag := range tags {
|
||||
tagCount := counts[fmt.Sprint(tag.Type, tag.Name, tag.ID)]
|
||||
if ((tag.Scope == "global" || tag.Scope == "admin") && tagCount >= 1) || (tag.Scope != "global" && tag.Scope != "admin") {
|
||||
tagItem := map[string]interface{}{
|
||||
tagItem := map[string]any{
|
||||
"id": tag.ID,
|
||||
"name": tag.Name,
|
||||
"scope": tag.Scope,
|
||||
@@ -231,8 +232,8 @@ func setupTaglistRoute(i InfoType, r *http.Request) InfoType {
|
||||
}
|
||||
|
||||
// FIXME: Lots of redundant code. Needs refactoring
|
||||
func buildFilterPresets(query url.Values) map[string]interface{} {
|
||||
filterPresets := map[string]interface{}{}
|
||||
func buildFilterPresets(query url.Values) map[string]any {
|
||||
filterPresets := map[string]any{}
|
||||
|
||||
if query.Get("cluster") != "" {
|
||||
filterPresets["cluster"] = query.Get("cluster")
|
||||
@@ -376,14 +377,14 @@ func buildFilterPresets(query url.Values) map[string]interface{} {
|
||||
}
|
||||
}
|
||||
if len(query["stat"]) != 0 {
|
||||
statList := make([]map[string]interface{}, 0)
|
||||
statList := make([]map[string]any, 0)
|
||||
for _, statEntry := range query["stat"] {
|
||||
parts := strings.Split(statEntry, "-")
|
||||
if len(parts) == 3 { // Metric Footprint Stat Field, from - to
|
||||
a, e1 := strconv.ParseInt(parts[1], 10, 64)
|
||||
b, e2 := strconv.ParseInt(parts[2], 10, 64)
|
||||
if e1 == nil && e2 == nil {
|
||||
statEntry := map[string]interface{}{
|
||||
statEntry := map[string]any{
|
||||
"field": parts[0],
|
||||
"from": a,
|
||||
"to": b,
|
||||
@@ -400,7 +401,6 @@ func buildFilterPresets(query url.Values) map[string]interface{} {
|
||||
func SetupRoutes(router chi.Router, buildInfo web.Build) {
|
||||
userCfgRepo := repository.GetUserCfgRepo()
|
||||
for _, route := range routes {
|
||||
route := route
|
||||
router.HandleFunc(route.Route, func(rw http.ResponseWriter, r *http.Request) {
|
||||
conf, err := userCfgRepo.GetUIConfig(repository.GetUserFromContext(r.Context()))
|
||||
if err != nil {
|
||||
@@ -409,7 +409,7 @@ func SetupRoutes(router chi.Router, buildInfo web.Build) {
|
||||
}
|
||||
|
||||
title := route.Title
|
||||
infos := route.Setup(map[string]interface{}{}, r)
|
||||
infos := route.Setup(map[string]any{}, r)
|
||||
if id, ok := infos["id"]; ok {
|
||||
title = strings.Replace(route.Title, "<ID>", id.(string), 1)
|
||||
if sid, ok := infos["sid"]; ok { // 2nd ID element
|
||||
|
||||
@@ -19,6 +19,14 @@ import (
|
||||
"github.com/ClusterCockpit/cc-lib/v2/util"
|
||||
)
|
||||
|
||||
func metadataKeys(m map[string]string) []string {
|
||||
keys := make([]string, 0, len(m))
|
||||
for k := range m {
|
||||
keys = append(keys, k)
|
||||
}
|
||||
return keys
|
||||
}
|
||||
|
||||
const (
|
||||
// defaultConfigPath is the default path for application tagging configuration
|
||||
defaultConfigPath = "./var/tagger/apps"
|
||||
@@ -52,7 +60,10 @@ func (t *AppTagger) scanApp(f *os.File, fns string) {
|
||||
ai := appInfo{tag: tag, patterns: make([]*regexp.Regexp, 0)}
|
||||
|
||||
for scanner.Scan() {
|
||||
line := scanner.Text()
|
||||
line := strings.TrimSpace(scanner.Text())
|
||||
if line == "" {
|
||||
continue
|
||||
}
|
||||
re, err := regexp.Compile(line)
|
||||
if err != nil {
|
||||
cclog.Errorf("invalid regex pattern '%s' in %s: %v", line, fns, err)
|
||||
@@ -68,6 +79,8 @@ func (t *AppTagger) scanApp(f *os.File, fns string) {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
cclog.Infof("AppTagger loaded %d patterns for %s", len(ai.patterns), tag)
|
||||
t.apps = append(t.apps, ai)
|
||||
}
|
||||
|
||||
@@ -86,6 +99,9 @@ func (t *AppTagger) EventCallback() {
|
||||
}
|
||||
|
||||
for _, fn := range files {
|
||||
if fn.IsDir() {
|
||||
continue
|
||||
}
|
||||
fns := fn.Name()
|
||||
cclog.Debugf("Process: %s", fns)
|
||||
f, err := os.Open(filepath.Join(t.cfgPath, fns))
|
||||
@@ -121,6 +137,9 @@ func (t *AppTagger) Register() error {
|
||||
}
|
||||
|
||||
for _, fn := range files {
|
||||
if fn.IsDir() {
|
||||
continue
|
||||
}
|
||||
fns := fn.Name()
|
||||
cclog.Debugf("Process: %s", fns)
|
||||
f, err := os.Open(filepath.Join(t.cfgPath, fns))
|
||||
@@ -147,29 +166,54 @@ func (t *AppTagger) Register() error {
|
||||
// Only the first matching application is tagged.
|
||||
func (t *AppTagger) Match(job *schema.Job) {
|
||||
r := repository.GetJobRepository()
|
||||
|
||||
if len(t.apps) == 0 {
|
||||
cclog.Warn("AppTagger: no app patterns loaded, skipping match")
|
||||
return
|
||||
}
|
||||
|
||||
metadata, err := r.FetchMetadata(job)
|
||||
if err != nil {
|
||||
cclog.Infof("Cannot fetch metadata for job: %d on %s", job.JobID, job.Cluster)
|
||||
cclog.Infof("AppTagger: cannot fetch metadata for job %d on %s: %v", job.JobID, job.Cluster, err)
|
||||
return
|
||||
}
|
||||
|
||||
if metadata == nil {
|
||||
cclog.Infof("AppTagger: metadata is nil for job %d on %s", job.JobID, job.Cluster)
|
||||
return
|
||||
}
|
||||
|
||||
jobscript, ok := metadata["jobScript"]
|
||||
if ok {
|
||||
id := *job.ID
|
||||
jobscriptLower := strings.ToLower(jobscript)
|
||||
if !ok {
|
||||
cclog.Infof("AppTagger: no 'jobScript' key in metadata for job %d on %s (keys: %v)",
|
||||
job.JobID, job.Cluster, metadataKeys(metadata))
|
||||
return
|
||||
}
|
||||
|
||||
out:
|
||||
for _, a := range t.apps {
|
||||
for _, re := range a.patterns {
|
||||
if re.MatchString(jobscriptLower) {
|
||||
if !r.HasTag(id, t.tagType, a.tag) {
|
||||
r.AddTagOrCreateDirect(id, t.tagType, a.tag)
|
||||
break out
|
||||
if len(jobscript) == 0 {
|
||||
cclog.Infof("AppTagger: empty jobScript for job %d on %s", job.JobID, job.Cluster)
|
||||
return
|
||||
}
|
||||
|
||||
id := *job.ID
|
||||
jobscriptLower := strings.ToLower(jobscript)
|
||||
cclog.Debugf("AppTagger: matching job %d (script length: %d) against %d apps", id, len(jobscriptLower), len(t.apps))
|
||||
|
||||
for _, a := range t.apps {
|
||||
for _, re := range a.patterns {
|
||||
if re.MatchString(jobscriptLower) {
|
||||
if r.HasTag(id, t.tagType, a.tag) {
|
||||
cclog.Debugf("AppTagger: job %d already has tag %s:%s, skipping", id, t.tagType, a.tag)
|
||||
} else {
|
||||
cclog.Infof("AppTagger: pattern '%s' matched for app '%s' on job %d", re.String(), a.tag, id)
|
||||
if _, err := r.AddTagOrCreateDirect(id, t.tagType, a.tag); err != nil {
|
||||
cclog.Errorf("AppTagger: failed to add tag '%s' to job %d: %v", a.tag, id, err)
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
}
|
||||
} else {
|
||||
cclog.Infof("Cannot extract job script for job: %d on %s", job.JobID, job.Cluster)
|
||||
}
|
||||
|
||||
cclog.Debugf("AppTagger: no pattern matched for job %d on %s", id, job.Cluster)
|
||||
}
|
||||
|
||||
@@ -51,10 +51,14 @@ func newTagger() {
|
||||
jobTagger.stopTaggers = append(jobTagger.stopTaggers, &JobClassTagger{})
|
||||
|
||||
for _, tagger := range jobTagger.startTaggers {
|
||||
tagger.Register()
|
||||
if err := tagger.Register(); err != nil {
|
||||
cclog.Errorf("failed to register start tagger: %s", err)
|
||||
}
|
||||
}
|
||||
for _, tagger := range jobTagger.stopTaggers {
|
||||
tagger.Register()
|
||||
if err := tagger.Register(); err != nil {
|
||||
cclog.Errorf("failed to register stop tagger: %s", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -18,7 +18,7 @@ import (
|
||||
func RegisterNodeStateRetentionDeleteService(ageHours int) {
|
||||
cclog.Info("Register node state retention delete service")
|
||||
|
||||
s.NewJob(gocron.DurationJob(1*time.Hour),
|
||||
s.NewJob(gocron.DailyJob(1, gocron.NewAtTimes(gocron.NewAtTime(2, 0, 0))),
|
||||
gocron.NewTask(
|
||||
func() {
|
||||
cutoff := time.Now().Unix() - int64(ageHours*3600)
|
||||
@@ -32,8 +32,8 @@ func RegisterNodeStateRetentionDeleteService(ageHours int) {
|
||||
}))
|
||||
}
|
||||
|
||||
func RegisterNodeStateRetentionParquetService(cfg *config.NodeStateRetention) {
|
||||
cclog.Info("Register node state retention parquet service")
|
||||
func RegisterNodeStateRetentionMoveService(cfg *config.NodeStateRetention) {
|
||||
cclog.Info("Register node state retention move service")
|
||||
|
||||
maxFileSizeMB := cfg.MaxFileSizeMB
|
||||
if maxFileSizeMB <= 0 {
|
||||
@@ -63,11 +63,11 @@ func RegisterNodeStateRetentionParquetService(cfg *config.NodeStateRetention) {
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
cclog.Errorf("NodeState parquet retention: failed to create target: %v", err)
|
||||
cclog.Errorf("NodeState move retention: failed to create target: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
s.NewJob(gocron.DurationJob(1*time.Hour),
|
||||
s.NewJob(gocron.DailyJob(1, gocron.NewAtTimes(gocron.NewAtTime(2, 30, 0))),
|
||||
gocron.NewTask(
|
||||
func() {
|
||||
cutoff := time.Now().Unix() - int64(ageHours*3600)
|
||||
@@ -75,14 +75,14 @@ func RegisterNodeStateRetentionParquetService(cfg *config.NodeStateRetention) {
|
||||
|
||||
rows, err := nodeRepo.FindNodeStatesBefore(cutoff)
|
||||
if err != nil {
|
||||
cclog.Errorf("NodeState parquet retention: error finding rows: %v", err)
|
||||
cclog.Errorf("NodeState move retention: error finding rows: %v", err)
|
||||
return
|
||||
}
|
||||
if len(rows) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
cclog.Infof("NodeState parquet retention: archiving %d rows", len(rows))
|
||||
cclog.Infof("NodeState move retention: archiving %d rows", len(rows))
|
||||
pw := pqarchive.NewNodeStateParquetWriter(target, maxFileSizeMB)
|
||||
|
||||
for _, ns := range rows {
|
||||
@@ -100,21 +100,21 @@ func RegisterNodeStateRetentionParquetService(cfg *config.NodeStateRetention) {
|
||||
SubCluster: ns.SubCluster,
|
||||
}
|
||||
if err := pw.AddRow(row); err != nil {
|
||||
cclog.Errorf("NodeState parquet retention: add row: %v", err)
|
||||
cclog.Errorf("NodeState move retention: add row: %v", err)
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
if err := pw.Close(); err != nil {
|
||||
cclog.Errorf("NodeState parquet retention: close writer: %v", err)
|
||||
cclog.Errorf("NodeState move retention: close writer: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
cnt, err := nodeRepo.DeleteNodeStatesBefore(cutoff)
|
||||
if err != nil {
|
||||
cclog.Errorf("NodeState parquet retention: error deleting rows: %v", err)
|
||||
cclog.Errorf("NodeState move retention: error deleting rows: %v", err)
|
||||
} else {
|
||||
cclog.Infof("NodeState parquet retention: deleted %d rows from db", cnt)
|
||||
cclog.Infof("NodeState move retention: deleted %d rows from db", cnt)
|
||||
}
|
||||
}))
|
||||
}
|
||||
|
||||
@@ -45,13 +45,13 @@ func createTargetBackend(cfg Retention) (archive.ArchiveBackend, error) {
|
||||
|
||||
switch cfg.TargetKind {
|
||||
case "s3":
|
||||
raw, err = json.Marshal(map[string]interface{}{
|
||||
"kind": "s3",
|
||||
"endpoint": cfg.TargetEndpoint,
|
||||
"bucket": cfg.TargetBucket,
|
||||
"access-key": cfg.TargetAccessKey,
|
||||
"secret-key": cfg.TargetSecretKey,
|
||||
"region": cfg.TargetRegion,
|
||||
raw, err = json.Marshal(map[string]any{
|
||||
"kind": "s3",
|
||||
"endpoint": cfg.TargetEndpoint,
|
||||
"bucket": cfg.TargetBucket,
|
||||
"access-key": cfg.TargetAccessKey,
|
||||
"secret-key": cfg.TargetSecretKey,
|
||||
"region": cfg.TargetRegion,
|
||||
"use-path-style": cfg.TargetUsePathStyle,
|
||||
})
|
||||
default:
|
||||
|
||||
@@ -154,8 +154,8 @@ func initNodeStateRetention() {
|
||||
switch cfg.Policy {
|
||||
case "delete":
|
||||
RegisterNodeStateRetentionDeleteService(age)
|
||||
case "parquet":
|
||||
RegisterNodeStateRetentionParquetService(cfg)
|
||||
case "move":
|
||||
RegisterNodeStateRetentionMoveService(cfg)
|
||||
default:
|
||||
cclog.Warnf("Unknown nodestate-retention policy: %s", cfg.Policy)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user