mirror of
https://github.com/ClusterCockpit/cc-backend
synced 2026-01-20 11:31:46 +01:00
Merge branch 'dev' of https://github.com/ClusterCockpit/cc-backend into dev
This commit is contained in:
@@ -16,8 +16,7 @@
|
|||||||
"checkpoints": {
|
"checkpoints": {
|
||||||
"interval": "12h"
|
"interval": "12h"
|
||||||
},
|
},
|
||||||
"retention-in-memory": "48h",
|
"retention-in-memory": "2m",
|
||||||
"memory-cap": 100
|
"memory-cap": 100
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -5,13 +5,18 @@
|
|||||||
"https-key-file": "/etc/letsencrypt/live/url/privkey.pem",
|
"https-key-file": "/etc/letsencrypt/live/url/privkey.pem",
|
||||||
"user": "clustercockpit",
|
"user": "clustercockpit",
|
||||||
"group": "clustercockpit",
|
"group": "clustercockpit",
|
||||||
"apiAllowedIPs": ["*"],
|
"apiAllowedIPs": [
|
||||||
|
"*"
|
||||||
|
],
|
||||||
"short-running-jobs-duration": 300,
|
"short-running-jobs-duration": 300,
|
||||||
"enable-job-taggers": true,
|
"enable-job-taggers": true,
|
||||||
"resampling": {
|
"resampling": {
|
||||||
"minimumPoints": 600,
|
"minimumPoints": 600,
|
||||||
"trigger": 180,
|
"trigger": 180,
|
||||||
"resolutions": [240, 60]
|
"resolutions": [
|
||||||
|
240,
|
||||||
|
60
|
||||||
|
]
|
||||||
},
|
},
|
||||||
"apiSubjects": {
|
"apiSubjects": {
|
||||||
"subjectJobEvent": "cc.job.event",
|
"subjectJobEvent": "cc.job.event",
|
||||||
@@ -47,10 +52,16 @@
|
|||||||
},
|
},
|
||||||
"metric-store": {
|
"metric-store": {
|
||||||
"checkpoints": {
|
"checkpoints": {
|
||||||
"interval": "12h"
|
"interval": "12h",
|
||||||
|
"directory": "./var/checkpoints"
|
||||||
},
|
},
|
||||||
"memory-cap": 100,
|
"memory-cap": 100,
|
||||||
"retention-in-memory": "48h",
|
"retention-in-memory": "48h",
|
||||||
|
"cleanup": {
|
||||||
|
"mode": "archive",
|
||||||
|
"interval": "48h",
|
||||||
|
"directory": "./var/archive"
|
||||||
|
},
|
||||||
"nats-subscriptions": [
|
"nats-subscriptions": [
|
||||||
{
|
{
|
||||||
"subscribe-to": "hpc-nats",
|
"subscribe-to": "hpc-nats",
|
||||||
@@ -63,4 +74,4 @@
|
|||||||
]
|
]
|
||||||
},
|
},
|
||||||
"ui-file": "ui-config.json"
|
"ui-file": "ui-config.json"
|
||||||
}
|
}
|
||||||
@@ -23,19 +23,23 @@ import (
|
|||||||
|
|
||||||
// Worker for either Archiving or Deleting files
|
// Worker for either Archiving or Deleting files
|
||||||
|
|
||||||
func Archiving(wg *sync.WaitGroup, ctx context.Context) {
|
func CleanUp(wg *sync.WaitGroup, ctx context.Context) {
|
||||||
if Keys.Archive != nil {
|
if Keys.Cleanup.Mode == "archive" {
|
||||||
// Run as Archiver
|
// Run as Archiver
|
||||||
runWorker(wg, ctx,
|
cleanUpWorker(wg, ctx,
|
||||||
Keys.Archive.ArchiveInterval,
|
Keys.Cleanup.Interval,
|
||||||
"archiving",
|
"archiving",
|
||||||
Keys.Archive.RootDir,
|
Keys.Cleanup.RootDir,
|
||||||
Keys.Archive.DeleteInstead,
|
false,
|
||||||
)
|
)
|
||||||
} else {
|
} else {
|
||||||
|
if Keys.Cleanup.Interval == "" {
|
||||||
|
Keys.Cleanup.Interval = Keys.RetentionInMemory
|
||||||
|
}
|
||||||
|
|
||||||
// Run as Deleter
|
// Run as Deleter
|
||||||
runWorker(wg, ctx,
|
cleanUpWorker(wg, ctx,
|
||||||
Keys.RetentionInMemory,
|
Keys.Cleanup.Interval,
|
||||||
"deleting",
|
"deleting",
|
||||||
"",
|
"",
|
||||||
true,
|
true,
|
||||||
@@ -44,7 +48,7 @@ func Archiving(wg *sync.WaitGroup, ctx context.Context) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// runWorker takes simple values to configure what it does
|
// runWorker takes simple values to configure what it does
|
||||||
func runWorker(wg *sync.WaitGroup, ctx context.Context, interval string, mode string, archiveDir string, delete bool) {
|
func cleanUpWorker(wg *sync.WaitGroup, ctx context.Context, interval string, mode string, cleanupDir string, delete bool) {
|
||||||
go func() {
|
go func() {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
|
|
||||||
@@ -67,12 +71,12 @@ func runWorker(wg *sync.WaitGroup, ctx context.Context, interval string, mode st
|
|||||||
t := time.Now().Add(-d)
|
t := time.Now().Add(-d)
|
||||||
cclog.Infof("[METRICSTORE]> start %s checkpoints (older than %s)...", mode, t.Format(time.RFC3339))
|
cclog.Infof("[METRICSTORE]> start %s checkpoints (older than %s)...", mode, t.Format(time.RFC3339))
|
||||||
|
|
||||||
n, err := ArchiveCheckpoints(Keys.Checkpoints.RootDir, archiveDir, t.Unix(), delete)
|
n, err := CleanupCheckpoints(Keys.Checkpoints.RootDir, cleanupDir, t.Unix(), delete)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
cclog.Errorf("[METRICSTORE]> %s failed: %s", mode, err.Error())
|
cclog.Errorf("[METRICSTORE]> %s failed: %s", mode, err.Error())
|
||||||
} else {
|
} else {
|
||||||
if delete && archiveDir == "" {
|
if delete && cleanupDir == "" {
|
||||||
cclog.Infof("[METRICSTORE]> done: %d checkpoints deleted", n)
|
cclog.Infof("[METRICSTORE]> done: %d checkpoints deleted", n)
|
||||||
} else {
|
} else {
|
||||||
cclog.Infof("[METRICSTORE]> done: %d files zipped and moved to archive", n)
|
cclog.Infof("[METRICSTORE]> done: %d files zipped and moved to archive", n)
|
||||||
@@ -85,9 +89,9 @@ func runWorker(wg *sync.WaitGroup, ctx context.Context, interval string, mode st
|
|||||||
|
|
||||||
var ErrNoNewArchiveData error = errors.New("all data already archived")
|
var ErrNoNewArchiveData error = errors.New("all data already archived")
|
||||||
|
|
||||||
// ZIP all checkpoint files older than `from` together and write them to the `archiveDir`,
|
// Delete or ZIP all checkpoint files older than `from` together and write them to the `cleanupDir`,
|
||||||
// deleting them from the `checkpointsDir`.
|
// deleting/moving them from the `checkpointsDir`.
|
||||||
func ArchiveCheckpoints(checkpointsDir, archiveDir string, from int64, deleteInstead bool) (int, error) {
|
func CleanupCheckpoints(checkpointsDir, cleanupDir string, from int64, deleteInstead bool) (int, error) {
|
||||||
entries1, err := os.ReadDir(checkpointsDir)
|
entries1, err := os.ReadDir(checkpointsDir)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, err
|
return 0, err
|
||||||
@@ -107,7 +111,7 @@ func ArchiveCheckpoints(checkpointsDir, archiveDir string, from int64, deleteIns
|
|||||||
go func() {
|
go func() {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
for workItem := range work {
|
for workItem := range work {
|
||||||
m, err := archiveCheckpoints(workItem.cdir, workItem.adir, from, deleteInstead)
|
m, err := cleanupCheckpoints(workItem.cdir, workItem.adir, from, deleteInstead)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
cclog.Errorf("error while archiving %s/%s: %s", workItem.cluster, workItem.host, err.Error())
|
cclog.Errorf("error while archiving %s/%s: %s", workItem.cluster, workItem.host, err.Error())
|
||||||
atomic.AddInt32(&errs, 1)
|
atomic.AddInt32(&errs, 1)
|
||||||
@@ -125,7 +129,7 @@ func ArchiveCheckpoints(checkpointsDir, archiveDir string, from int64, deleteIns
|
|||||||
|
|
||||||
for _, de2 := range entries2 {
|
for _, de2 := range entries2 {
|
||||||
cdir := filepath.Join(checkpointsDir, de1.Name(), de2.Name())
|
cdir := filepath.Join(checkpointsDir, de1.Name(), de2.Name())
|
||||||
adir := filepath.Join(archiveDir, de1.Name(), de2.Name())
|
adir := filepath.Join(cleanupDir, de1.Name(), de2.Name())
|
||||||
work <- workItem{
|
work <- workItem{
|
||||||
adir: adir, cdir: cdir,
|
adir: adir, cdir: cdir,
|
||||||
cluster: de1.Name(), host: de2.Name(),
|
cluster: de1.Name(), host: de2.Name(),
|
||||||
@@ -146,8 +150,8 @@ func ArchiveCheckpoints(checkpointsDir, archiveDir string, from int64, deleteIns
|
|||||||
return int(n), nil
|
return int(n), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Helper function for `ArchiveCheckpoints`.
|
// Helper function for `CleanupCheckpoints`.
|
||||||
func archiveCheckpoints(dir string, archiveDir string, from int64, deleteInstead bool) (int, error) {
|
func cleanupCheckpoints(dir string, cleanupDir string, from int64, deleteInstead bool) (int, error) {
|
||||||
entries, err := os.ReadDir(dir)
|
entries, err := os.ReadDir(dir)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, err
|
return 0, err
|
||||||
@@ -171,10 +175,10 @@ func archiveCheckpoints(dir string, archiveDir string, from int64, deleteInstead
|
|||||||
return n, nil
|
return n, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
filename := filepath.Join(archiveDir, fmt.Sprintf("%d.zip", from))
|
filename := filepath.Join(cleanupDir, fmt.Sprintf("%d.zip", from))
|
||||||
f, err := os.OpenFile(filename, os.O_CREATE|os.O_WRONLY, CheckpointFilePerms)
|
f, err := os.OpenFile(filename, os.O_CREATE|os.O_WRONLY, CheckpointFilePerms)
|
||||||
if err != nil && os.IsNotExist(err) {
|
if err != nil && os.IsNotExist(err) {
|
||||||
err = os.MkdirAll(archiveDir, CheckpointDirPerms)
|
err = os.MkdirAll(cleanupDir, CheckpointDirPerms)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
f, err = os.OpenFile(filename, os.O_CREATE|os.O_WRONLY, CheckpointFilePerms)
|
f, err = os.OpenFile(filename, os.O_CREATE|os.O_WRONLY, CheckpointFilePerms)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -17,10 +17,10 @@
|
|||||||
// │ ├─ FileFormat: "avro" or "json"
|
// │ ├─ FileFormat: "avro" or "json"
|
||||||
// │ ├─ Interval: How often to save (e.g., "1h")
|
// │ ├─ Interval: How often to save (e.g., "1h")
|
||||||
// │ └─ RootDir: Checkpoint storage path
|
// │ └─ RootDir: Checkpoint storage path
|
||||||
// ├─ Archive: Long-term storage configuration
|
// ├─ Cleanup: Long-term storage configuration
|
||||||
// │ ├─ ArchiveInterval: How often to archive
|
// │ ├─ Interval: How often to delete/archive
|
||||||
// │ ├─ RootDir: Archive storage path
|
// │ ├─ RootDir: Archive storage path
|
||||||
// │ └─ DeleteInstead: Delete old data instead of archiving
|
// │ └─ Mode: "delete" or "archive"
|
||||||
// ├─ Debug: Development/debugging options
|
// ├─ Debug: Development/debugging options
|
||||||
// └─ Subscriptions: NATS topic subscriptions for metric ingestion
|
// └─ Subscriptions: NATS topic subscriptions for metric ingestion
|
||||||
//
|
//
|
||||||
@@ -48,12 +48,13 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
DefaultMaxWorkers = 10
|
DefaultMaxWorkers = 10
|
||||||
DefaultBufferCapacity = 512
|
DefaultBufferCapacity = 512
|
||||||
DefaultGCTriggerInterval = 100
|
DefaultGCTriggerInterval = 100
|
||||||
DefaultAvroWorkers = 4
|
DefaultAvroWorkers = 4
|
||||||
DefaultCheckpointBufferMin = 3
|
DefaultCheckpointBufferMin = 3
|
||||||
DefaultAvroCheckpointInterval = time.Minute
|
DefaultAvroCheckpointInterval = time.Minute
|
||||||
|
DefaultMemoryUsageTrackerInterval = 1 * time.Hour
|
||||||
)
|
)
|
||||||
|
|
||||||
// Checkpoints configures periodic persistence of in-memory metric data.
|
// Checkpoints configures periodic persistence of in-memory metric data.
|
||||||
@@ -86,10 +87,10 @@ type Debug struct {
|
|||||||
// - ArchiveInterval: Duration string (e.g., "24h") between archive operations
|
// - ArchiveInterval: Duration string (e.g., "24h") between archive operations
|
||||||
// - RootDir: Filesystem path for archived data (created if missing)
|
// - RootDir: Filesystem path for archived data (created if missing)
|
||||||
// - DeleteInstead: If true, delete old data instead of archiving (saves disk space)
|
// - DeleteInstead: If true, delete old data instead of archiving (saves disk space)
|
||||||
type Archive struct {
|
type Cleanup struct {
|
||||||
ArchiveInterval string `json:"interval"`
|
Interval string `json:"interval"`
|
||||||
RootDir string `json:"directory"`
|
RootDir string `json:"directory"`
|
||||||
DeleteInstead bool `json:"delete-instead"`
|
Mode string `json:"mode"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// Subscriptions defines NATS topics to subscribe to for metric ingestion.
|
// Subscriptions defines NATS topics to subscribe to for metric ingestion.
|
||||||
@@ -129,7 +130,7 @@ type MetricStoreConfig struct {
|
|||||||
MemoryCap int `json:"memory-cap"`
|
MemoryCap int `json:"memory-cap"`
|
||||||
Checkpoints Checkpoints `json:"checkpoints"`
|
Checkpoints Checkpoints `json:"checkpoints"`
|
||||||
Debug *Debug `json:"debug"`
|
Debug *Debug `json:"debug"`
|
||||||
Archive *Archive `json:"archive"`
|
Cleanup *Cleanup `json:"cleanup"`
|
||||||
Subscriptions *Subscriptions `json:"nats-subscriptions"`
|
Subscriptions *Subscriptions `json:"nats-subscriptions"`
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -142,6 +143,9 @@ var Keys MetricStoreConfig = MetricStoreConfig{
|
|||||||
FileFormat: "avro",
|
FileFormat: "avro",
|
||||||
RootDir: "./var/checkpoints",
|
RootDir: "./var/checkpoints",
|
||||||
},
|
},
|
||||||
|
Cleanup: &Cleanup{
|
||||||
|
Mode: "delete",
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
// AggregationStrategy defines how to combine metric values across hierarchy levels.
|
// AggregationStrategy defines how to combine metric values across hierarchy levels.
|
||||||
|
|||||||
@@ -32,20 +32,32 @@ const configSchema = `{
|
|||||||
},
|
},
|
||||||
"required": ["interval"]
|
"required": ["interval"]
|
||||||
},
|
},
|
||||||
"archive": {
|
"cleanup": {
|
||||||
"description": "Configuration for archiving the already checkpointed files.",
|
"description": "Configuration for the cleanup process.",
|
||||||
"type": "object",
|
"type": "object",
|
||||||
"properties": {
|
"properties": {
|
||||||
|
"mode": {
|
||||||
|
"description": "The operation mode (e.g., 'archive' or 'delete').",
|
||||||
|
"type": "string",
|
||||||
|
"enum": ["archive", "delete"]
|
||||||
|
},
|
||||||
"interval": {
|
"interval": {
|
||||||
"description": "Interval at which the checkpointed files should be archived.",
|
"description": "Interval at which the cleanup runs.",
|
||||||
"type": "string"
|
"type": "string"
|
||||||
},
|
},
|
||||||
"directory": {
|
"directory": {
|
||||||
"description": "Specify the directy in which the archived files should be placed.",
|
"description": "Target directory for operations.",
|
||||||
"type": "string"
|
"type": "string"
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
"required": ["interval", "directory"]
|
"if": {
|
||||||
|
"properties": {
|
||||||
|
"mode": { "const": "archive" }
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"then": {
|
||||||
|
"required": ["interval", "directory"]
|
||||||
|
}
|
||||||
},
|
},
|
||||||
"retention-in-memory": {
|
"retention-in-memory": {
|
||||||
"description": "Keep the metrics within memory for given time interval. Retention for X hours, then the metrics would be freed.",
|
"description": "Keep the metrics within memory for given time interval. Retention for X hours, then the metrics would be freed.",
|
||||||
|
|||||||
@@ -200,7 +200,7 @@ func Init(rawConfig json.RawMessage, wg *sync.WaitGroup) {
|
|||||||
|
|
||||||
Retention(wg, ctx)
|
Retention(wg, ctx)
|
||||||
Checkpointing(wg, ctx)
|
Checkpointing(wg, ctx)
|
||||||
Archiving(wg, ctx)
|
CleanUp(wg, ctx)
|
||||||
DataStaging(wg, ctx)
|
DataStaging(wg, ctx)
|
||||||
MemoryUsageTracker(wg, ctx)
|
MemoryUsageTracker(wg, ctx)
|
||||||
|
|
||||||
@@ -386,7 +386,7 @@ func MemoryUsageTracker(wg *sync.WaitGroup, ctx context.Context) {
|
|||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
d := 1 * time.Minute
|
d := DefaultMemoryUsageTrackerInterval
|
||||||
|
|
||||||
if d <= 0 {
|
if d <= 0 {
|
||||||
return
|
return
|
||||||
|
|||||||
Reference in New Issue
Block a user