From 54ea5d790054dc87b82d43b61b9ac6b180f36684 Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Thu, 12 Feb 2026 09:21:44 +0100 Subject: [PATCH] Add nodestate retention and archiving --- internal/config/config.go | 17 +++ internal/config/schema.go | 53 ++++++++ internal/repository/node.go | 65 ++++++++++ .../taskmanager/nodestateRetentionService.go | 120 ++++++++++++++++++ internal/taskmanager/taskManager.go | 21 +++ pkg/archive/parquet/nodestate_schema.go | 20 +++ pkg/archive/parquet/nodestate_writer.go | 104 +++++++++++++++ 7 files changed, 400 insertions(+) create mode 100644 internal/taskmanager/nodestateRetentionService.go create mode 100644 pkg/archive/parquet/nodestate_schema.go create mode 100644 pkg/archive/parquet/nodestate_writer.go diff --git a/internal/config/config.go b/internal/config/config.go index 4e6fe975..2e601ed7 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -71,6 +71,23 @@ type ProgramConfig struct { // If exists, will enable dynamic zoom in frontend metric plots using the configured values EnableResampling *ResampleConfig `json:"resampling"` + + // Node state retention configuration + NodeStateRetention *NodeStateRetention `json:"nodestate-retention"` +} + +type NodeStateRetention struct { + Policy string `json:"policy"` // "delete" or "parquet" + Age int `json:"age"` // hours, default 24 + TargetKind string `json:"target-kind"` // "file" or "s3" + TargetPath string `json:"target-path"` + TargetEndpoint string `json:"target-endpoint"` + TargetBucket string `json:"target-bucket"` + TargetAccessKey string `json:"target-access-key"` + TargetSecretKey string `json:"target-secret-key"` + TargetRegion string `json:"target-region"` + TargetUsePathStyle bool `json:"target-use-path-style"` + MaxFileSizeMB int `json:"max-file-size-mb"` } type ResampleConfig struct { diff --git a/internal/config/schema.go b/internal/config/schema.go index 0d575b3c..bd1b314e 100644 --- a/internal/config/schema.go +++ b/internal/config/schema.go @@ -130,6 +130,59 @@ var configSchema = ` } }, "required": ["subject-job-event", "subject-node-state"] + }, + "nodestate-retention": { + "description": "Node state retention configuration for cleaning up old node_state rows.", + "type": "object", + "properties": { + "policy": { + "description": "Retention policy: 'delete' to remove old rows, 'parquet' to archive then delete.", + "type": "string", + "enum": ["delete", "parquet"] + }, + "age": { + "description": "Retention age in hours (default: 24).", + "type": "integer" + }, + "target-kind": { + "description": "Target kind for parquet archiving: 'file' or 's3'.", + "type": "string", + "enum": ["file", "s3"] + }, + "target-path": { + "description": "Filesystem path for parquet file target.", + "type": "string" + }, + "target-endpoint": { + "description": "S3 endpoint URL.", + "type": "string" + }, + "target-bucket": { + "description": "S3 bucket name.", + "type": "string" + }, + "target-access-key": { + "description": "S3 access key.", + "type": "string" + }, + "target-secret-key": { + "description": "S3 secret key.", + "type": "string" + }, + "target-region": { + "description": "S3 region.", + "type": "string" + }, + "target-use-path-style": { + "description": "Use path-style S3 addressing.", + "type": "boolean" + }, + "max-file-size-mb": { + "description": "Maximum parquet file size in MB (default: 128).", + "type": "integer" + } + }, + "required": ["policy"] } } }` diff --git a/internal/repository/node.go b/internal/repository/node.go index 82dcf067..a746182b 100644 --- a/internal/repository/node.go +++ b/internal/repository/node.go @@ -225,6 +225,71 @@ func (r *NodeRepository) UpdateNodeState(hostname string, cluster string, nodeSt // return nil // } +// NodeStateWithNode combines a node state row with denormalized node info. +type NodeStateWithNode struct { + ID int64 `db:"id"` + TimeStamp int64 `db:"time_stamp"` + NodeState string `db:"node_state"` + HealthState string `db:"health_state"` + HealthMetrics string `db:"health_metrics"` + CpusAllocated int `db:"cpus_allocated"` + MemoryAllocated int64 `db:"memory_allocated"` + GpusAllocated int `db:"gpus_allocated"` + JobsRunning int `db:"jobs_running"` + Hostname string `db:"hostname"` + Cluster string `db:"cluster"` + SubCluster string `db:"subcluster"` +} + +// FindNodeStatesBefore returns all node_state rows with time_stamp < cutoff, +// joined with node info for denormalized archiving. +func (r *NodeRepository) FindNodeStatesBefore(cutoff int64) ([]NodeStateWithNode, error) { + rows, err := sq.Select( + "node_state.id", "node_state.time_stamp", "node_state.node_state", + "node_state.health_state", "node_state.health_metrics", + "node_state.cpus_allocated", "node_state.memory_allocated", + "node_state.gpus_allocated", "node_state.jobs_running", + "node.hostname", "node.cluster", "node.subcluster", + ). + From("node_state"). + Join("node ON node_state.node_id = node.id"). + Where(sq.Lt{"node_state.time_stamp": cutoff}). + Where("node_state.id NOT IN (SELECT MAX(id) FROM node_state GROUP BY node_id)"). + OrderBy("node_state.time_stamp ASC"). + RunWith(r.DB).Query() + if err != nil { + return nil, err + } + defer rows.Close() + + var result []NodeStateWithNode + for rows.Next() { + var ns NodeStateWithNode + if err := rows.Scan(&ns.ID, &ns.TimeStamp, &ns.NodeState, + &ns.HealthState, &ns.HealthMetrics, + &ns.CpusAllocated, &ns.MemoryAllocated, + &ns.GpusAllocated, &ns.JobsRunning, + &ns.Hostname, &ns.Cluster, &ns.SubCluster); err != nil { + return nil, err + } + result = append(result, ns) + } + return result, nil +} + +// DeleteNodeStatesBefore removes node_state rows with time_stamp < cutoff, +// but always preserves the latest row per node_id. +func (r *NodeRepository) DeleteNodeStatesBefore(cutoff int64) (int64, error) { + res, err := r.DB.Exec( + `DELETE FROM node_state WHERE time_stamp < ? AND id NOT IN (SELECT MAX(id) FROM node_state GROUP BY node_id)`, + cutoff, + ) + if err != nil { + return 0, err + } + return res.RowsAffected() +} + func (r *NodeRepository) DeleteNode(id int64) error { _, err := r.DB.Exec(`DELETE FROM node WHERE node.id = ?`, id) if err != nil { diff --git a/internal/taskmanager/nodestateRetentionService.go b/internal/taskmanager/nodestateRetentionService.go new file mode 100644 index 00000000..9a704502 --- /dev/null +++ b/internal/taskmanager/nodestateRetentionService.go @@ -0,0 +1,120 @@ +// Copyright (C) NHR@FAU, University Erlangen-Nuremberg. +// All rights reserved. This file is part of cc-backend. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package taskmanager + +import ( + "time" + + "github.com/ClusterCockpit/cc-backend/internal/config" + "github.com/ClusterCockpit/cc-backend/internal/repository" + pqarchive "github.com/ClusterCockpit/cc-backend/pkg/archive/parquet" + cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger" + "github.com/go-co-op/gocron/v2" +) + +func RegisterNodeStateRetentionDeleteService(ageHours int) { + cclog.Info("Register node state retention delete service") + + s.NewJob(gocron.DurationJob(1*time.Hour), + gocron.NewTask( + func() { + cutoff := time.Now().Unix() - int64(ageHours*3600) + nodeRepo := repository.GetNodeRepository() + cnt, err := nodeRepo.DeleteNodeStatesBefore(cutoff) + if err != nil { + cclog.Errorf("NodeState retention: error deleting old rows: %v", err) + } else if cnt > 0 { + cclog.Infof("NodeState retention: deleted %d old rows", cnt) + } + })) +} + +func RegisterNodeStateRetentionParquetService(cfg *config.NodeStateRetention) { + cclog.Info("Register node state retention parquet service") + + maxFileSizeMB := cfg.MaxFileSizeMB + if maxFileSizeMB <= 0 { + maxFileSizeMB = 128 + } + + ageHours := cfg.Age + if ageHours <= 0 { + ageHours = 24 + } + + var target pqarchive.ParquetTarget + var err error + + switch cfg.TargetKind { + case "s3": + target, err = pqarchive.NewS3Target(pqarchive.S3TargetConfig{ + Endpoint: cfg.TargetEndpoint, + Bucket: cfg.TargetBucket, + AccessKey: cfg.TargetAccessKey, + SecretKey: cfg.TargetSecretKey, + Region: cfg.TargetRegion, + UsePathStyle: cfg.TargetUsePathStyle, + }) + default: + target, err = pqarchive.NewFileTarget(cfg.TargetPath) + } + + if err != nil { + cclog.Errorf("NodeState parquet retention: failed to create target: %v", err) + return + } + + s.NewJob(gocron.DurationJob(1*time.Hour), + gocron.NewTask( + func() { + cutoff := time.Now().Unix() - int64(ageHours*3600) + nodeRepo := repository.GetNodeRepository() + + rows, err := nodeRepo.FindNodeStatesBefore(cutoff) + if err != nil { + cclog.Errorf("NodeState parquet retention: error finding rows: %v", err) + return + } + if len(rows) == 0 { + return + } + + cclog.Infof("NodeState parquet retention: archiving %d rows", len(rows)) + pw := pqarchive.NewNodeStateParquetWriter(target, maxFileSizeMB) + + for _, ns := range rows { + row := pqarchive.ParquetNodeStateRow{ + TimeStamp: ns.TimeStamp, + NodeState: ns.NodeState, + HealthState: ns.HealthState, + HealthMetrics: ns.HealthMetrics, + CpusAllocated: int32(ns.CpusAllocated), + MemoryAllocated: ns.MemoryAllocated, + GpusAllocated: int32(ns.GpusAllocated), + JobsRunning: int32(ns.JobsRunning), + Hostname: ns.Hostname, + Cluster: ns.Cluster, + SubCluster: ns.SubCluster, + } + if err := pw.AddRow(row); err != nil { + cclog.Errorf("NodeState parquet retention: add row: %v", err) + continue + } + } + + if err := pw.Close(); err != nil { + cclog.Errorf("NodeState parquet retention: close writer: %v", err) + return + } + + cnt, err := nodeRepo.DeleteNodeStatesBefore(cutoff) + if err != nil { + cclog.Errorf("NodeState parquet retention: error deleting rows: %v", err) + } else { + cclog.Infof("NodeState parquet retention: deleted %d rows from db", cnt) + } + })) +} diff --git a/internal/taskmanager/taskManager.go b/internal/taskmanager/taskManager.go index e323557b..8cf6b4e6 100644 --- a/internal/taskmanager/taskManager.go +++ b/internal/taskmanager/taskManager.go @@ -144,9 +144,30 @@ func Start(cronCfg, archiveConfig json.RawMessage) { RegisterUpdateDurationWorker() RegisterCommitJobService() + if config.Keys.NodeStateRetention != nil && config.Keys.NodeStateRetention.Policy != "" { + initNodeStateRetention() + } + s.Start() } +func initNodeStateRetention() { + cfg := config.Keys.NodeStateRetention + age := cfg.Age + if age <= 0 { + age = 24 + } + + switch cfg.Policy { + case "delete": + RegisterNodeStateRetentionDeleteService(age) + case "parquet": + RegisterNodeStateRetentionParquetService(cfg) + default: + cclog.Warnf("Unknown nodestate-retention policy: %s", cfg.Policy) + } +} + // Shutdown stops the task manager and its scheduler. func Shutdown() { if s != nil { diff --git a/pkg/archive/parquet/nodestate_schema.go b/pkg/archive/parquet/nodestate_schema.go new file mode 100644 index 00000000..c9dfe363 --- /dev/null +++ b/pkg/archive/parquet/nodestate_schema.go @@ -0,0 +1,20 @@ +// Copyright (C) NHR@FAU, University Erlangen-Nuremberg. +// All rights reserved. This file is part of cc-backend. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package parquet + +type ParquetNodeStateRow struct { + TimeStamp int64 `parquet:"time_stamp"` + NodeState string `parquet:"node_state"` + HealthState string `parquet:"health_state"` + HealthMetrics string `parquet:"health_metrics,optional"` + CpusAllocated int32 `parquet:"cpus_allocated"` + MemoryAllocated int64 `parquet:"memory_allocated"` + GpusAllocated int32 `parquet:"gpus_allocated"` + JobsRunning int32 `parquet:"jobs_running"` + Hostname string `parquet:"hostname"` + Cluster string `parquet:"cluster"` + SubCluster string `parquet:"subcluster"` +} diff --git a/pkg/archive/parquet/nodestate_writer.go b/pkg/archive/parquet/nodestate_writer.go new file mode 100644 index 00000000..053417d6 --- /dev/null +++ b/pkg/archive/parquet/nodestate_writer.go @@ -0,0 +1,104 @@ +// Copyright (C) NHR@FAU, University Erlangen-Nuremberg. +// All rights reserved. This file is part of cc-backend. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package parquet + +import ( + "bytes" + "fmt" + "time" + + cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger" + pq "github.com/parquet-go/parquet-go" +) + +// NodeStateParquetWriter batches ParquetNodeStateRows and flushes them to a target +// when the estimated size exceeds maxSizeBytes. +type NodeStateParquetWriter struct { + target ParquetTarget + maxSizeBytes int64 + rows []ParquetNodeStateRow + currentSize int64 + fileCounter int + datePrefix string +} + +// NewNodeStateParquetWriter creates a new writer for node state parquet files. +func NewNodeStateParquetWriter(target ParquetTarget, maxSizeMB int) *NodeStateParquetWriter { + return &NodeStateParquetWriter{ + target: target, + maxSizeBytes: int64(maxSizeMB) * 1024 * 1024, + datePrefix: time.Now().Format("2006-01-02"), + } +} + +// AddRow adds a row to the current batch. If the estimated batch size +// exceeds the configured maximum, the batch is flushed first. +func (pw *NodeStateParquetWriter) AddRow(row ParquetNodeStateRow) error { + rowSize := estimateNodeStateRowSize(&row) + + if pw.currentSize+rowSize > pw.maxSizeBytes && len(pw.rows) > 0 { + if err := pw.Flush(); err != nil { + return err + } + } + + pw.rows = append(pw.rows, row) + pw.currentSize += rowSize + return nil +} + +// Flush writes the current batch to a parquet file on the target. +func (pw *NodeStateParquetWriter) Flush() error { + if len(pw.rows) == 0 { + return nil + } + + pw.fileCounter++ + fileName := fmt.Sprintf("cc-nodestate-%s-%03d.parquet", pw.datePrefix, pw.fileCounter) + + data, err := writeNodeStateParquetBytes(pw.rows) + if err != nil { + return fmt.Errorf("write parquet buffer: %w", err) + } + + if err := pw.target.WriteFile(fileName, data); err != nil { + return fmt.Errorf("write parquet file %q: %w", fileName, err) + } + + cclog.Infof("NodeState retention: wrote %s (%d rows, %d bytes)", fileName, len(pw.rows), len(data)) + pw.rows = pw.rows[:0] + pw.currentSize = 0 + return nil +} + +// Close flushes any remaining rows and finalizes the writer. +func (pw *NodeStateParquetWriter) Close() error { + return pw.Flush() +} + +func writeNodeStateParquetBytes(rows []ParquetNodeStateRow) ([]byte, error) { + var buf bytes.Buffer + + writer := pq.NewGenericWriter[ParquetNodeStateRow](&buf, + pq.Compression(&pq.Snappy), + ) + + if _, err := writer.Write(rows); err != nil { + return nil, err + } + if err := writer.Close(); err != nil { + return nil, err + } + + return buf.Bytes(), nil +} + +func estimateNodeStateRowSize(row *ParquetNodeStateRow) int64 { + size := int64(100) // fixed numeric fields + size += int64(len(row.NodeState) + len(row.HealthState) + len(row.HealthMetrics)) + size += int64(len(row.Hostname) + len(row.Cluster) + len(row.SubCluster)) + return size +}