mirror of
https://github.com/ClusterCockpit/cc-backend
synced 2025-11-26 03:23:07 +01:00
380 lines
11 KiB
Go
380 lines
11 KiB
Go
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
|
|
// All rights reserved. This file is part of cc-backend.
|
|
// Use of this source code is governed by a MIT-style
|
|
// license that can be found in the LICENSE file.
|
|
|
|
// Package archive implements the job archive interface and various backend implementations.
|
|
//
|
|
// The archive package provides a pluggable storage backend system for job metadata and performance data.
|
|
// It supports three backend types:
|
|
//
|
|
// - file: Filesystem-based storage with hierarchical directory structure
|
|
// - s3: AWS S3 and S3-compatible object storage (MinIO, localstack)
|
|
// - sqlite: Single-file SQLite database with BLOB storage
|
|
//
|
|
// # Backend Selection
|
|
//
|
|
// Choose a backend based on your deployment requirements:
|
|
//
|
|
// - File: Best for single-server deployments with local fast storage
|
|
// - S3: Best for distributed deployments requiring redundancy and multi-instance access
|
|
// - SQLite: Best for portable archives with SQL query capability and transactional integrity
|
|
//
|
|
// # Configuration
|
|
//
|
|
// The archive backend is configured via JSON in the application config file:
|
|
//
|
|
// {
|
|
// "archive": {
|
|
// "kind": "file", // or "s3" or "sqlite"
|
|
// "path": "/var/lib/archive" // for file backend
|
|
// }
|
|
// }
|
|
//
|
|
// For S3 backend:
|
|
//
|
|
// {
|
|
// "archive": {
|
|
// "kind": "s3",
|
|
// "bucket": "my-job-archive",
|
|
// "region": "us-east-1",
|
|
// "accessKey": "...",
|
|
// "secretKey": "..."
|
|
// }
|
|
// }
|
|
//
|
|
// For SQLite backend:
|
|
//
|
|
// {
|
|
// "archive": {
|
|
// "kind": "sqlite",
|
|
// "dbPath": "/var/lib/archive.db"
|
|
// }
|
|
// }
|
|
//
|
|
// # Usage
|
|
//
|
|
// The package is initialized once at application startup:
|
|
//
|
|
// err := archive.Init(rawConfig, false)
|
|
// if err != nil {
|
|
// log.Fatal(err)
|
|
// }
|
|
//
|
|
// After initialization, use the global functions to interact with the archive:
|
|
//
|
|
// // Check if a job exists
|
|
// exists := archive.GetHandle().Exists(job)
|
|
//
|
|
// // Load job metadata
|
|
// jobMeta, err := archive.GetHandle().LoadJobMeta(job)
|
|
//
|
|
// // Store job metadata
|
|
// err = archive.GetHandle().StoreJobMeta(job)
|
|
//
|
|
// # Thread Safety
|
|
//
|
|
// All backend implementations are safe for concurrent use. The package uses
|
|
// internal locking for operations that modify shared state.
|
|
package archive
|
|
|
|
import (
|
|
"encoding/json"
|
|
"fmt"
|
|
"maps"
|
|
"sync"
|
|
|
|
"github.com/ClusterCockpit/cc-backend/internal/config"
|
|
cclog "github.com/ClusterCockpit/cc-lib/ccLogger"
|
|
"github.com/ClusterCockpit/cc-lib/lrucache"
|
|
"github.com/ClusterCockpit/cc-lib/schema"
|
|
)
|
|
|
|
// Version is the current archive schema version.
|
|
// The archive backend must match this version for compatibility.
|
|
const Version uint64 = 3
|
|
|
|
// ArchiveBackend defines the interface that all archive storage backends must implement.
|
|
// Implementations include FsArchive (filesystem), S3Archive (object storage), and SqliteArchive (database).
|
|
//
|
|
// All methods are safe for concurrent use unless otherwise noted.
|
|
type ArchiveBackend interface {
|
|
// Init initializes the archive backend with the provided configuration.
|
|
// Returns the archive version found in the backend storage.
|
|
// Returns an error if the version is incompatible or initialization fails.
|
|
Init(rawConfig json.RawMessage) (uint64, error)
|
|
|
|
// Info prints archive statistics to stdout, including job counts,
|
|
// date ranges, and storage sizes per cluster.
|
|
Info()
|
|
|
|
// Exists checks if a job with the given ID, cluster, and start time
|
|
// exists in the archive.
|
|
Exists(job *schema.Job) bool
|
|
|
|
// LoadJobMeta loads job metadata from the archive.
|
|
// Returns the complete Job structure including resources, tags, and statistics.
|
|
LoadJobMeta(job *schema.Job) (*schema.Job, error)
|
|
|
|
// LoadJobData loads the complete time-series performance data for a job.
|
|
// Returns a map of metric names to their scoped data (node, socket, core, etc.).
|
|
LoadJobData(job *schema.Job) (schema.JobData, error)
|
|
|
|
// LoadJobStats loads pre-computed statistics from the job data.
|
|
// Returns scoped statistics (min, max, avg) for all metrics.
|
|
LoadJobStats(job *schema.Job) (schema.ScopedJobStats, error)
|
|
|
|
// LoadClusterCfg loads the cluster configuration.
|
|
// Returns the cluster topology, metrics, and hardware specifications.
|
|
LoadClusterCfg(name string) (*schema.Cluster, error)
|
|
|
|
// StoreJobMeta stores job metadata to the archive.
|
|
// Overwrites existing metadata for the same job ID, cluster, and start time.
|
|
StoreJobMeta(jobMeta *schema.Job) error
|
|
|
|
// ImportJob stores both job metadata and performance data to the archive.
|
|
// This is typically used during initial job archiving.
|
|
ImportJob(jobMeta *schema.Job, jobData *schema.JobData) error
|
|
|
|
// GetClusters returns a list of all cluster names found in the archive.
|
|
GetClusters() []string
|
|
|
|
// CleanUp removes the specified jobs from the archive.
|
|
// Used by retention policies to delete old jobs.
|
|
CleanUp(jobs []*schema.Job)
|
|
|
|
// Move relocates jobs to a different path within the archive.
|
|
// The implementation depends on the backend type.
|
|
Move(jobs []*schema.Job, path string)
|
|
|
|
// Clean removes jobs outside the specified time range.
|
|
// Jobs with start_time < before OR start_time > after are deleted.
|
|
// Set after=0 to only use the before parameter.
|
|
Clean(before int64, after int64)
|
|
|
|
// Compress compresses job data files to save storage space.
|
|
// For filesystem and SQLite backends, this applies gzip compression.
|
|
// For S3, this compresses and replaces objects.
|
|
Compress(jobs []*schema.Job)
|
|
|
|
// CompressLast returns the timestamp of the last compression run
|
|
// and updates it to the provided starttime.
|
|
CompressLast(starttime int64) int64
|
|
|
|
// Iter returns a channel that yields all jobs in the archive.
|
|
// If loadMetricData is true, includes performance data; otherwise only metadata.
|
|
// The channel is closed when iteration completes.
|
|
Iter(loadMetricData bool) <-chan JobContainer
|
|
}
|
|
|
|
// JobContainer combines job metadata and optional performance data.
|
|
// Used by Iter() to yield jobs during archive iteration.
|
|
type JobContainer struct {
|
|
Meta *schema.Job // Job metadata (always present)
|
|
Data *schema.JobData // Performance data (nil if not loaded)
|
|
}
|
|
|
|
var (
|
|
initOnce sync.Once
|
|
cache *lrucache.Cache = lrucache.New(128 * 1024 * 1024)
|
|
ar ArchiveBackend
|
|
useArchive bool
|
|
mutex sync.Mutex
|
|
)
|
|
|
|
// Init initializes the archive backend with the provided configuration.
|
|
// Must be called once at application startup before using any archive functions.
|
|
//
|
|
// Parameters:
|
|
// - rawConfig: JSON configuration for the archive backend
|
|
// - disableArchive: if true, disables archive functionality
|
|
//
|
|
// The configuration determines which backend is used (file, s3, or sqlite).
|
|
// Returns an error if initialization fails or version is incompatible.
|
|
func Init(rawConfig json.RawMessage, disableArchive bool) error {
|
|
var err error
|
|
|
|
initOnce.Do(func() {
|
|
useArchive = !disableArchive
|
|
|
|
var cfg struct {
|
|
Kind string `json:"kind"`
|
|
}
|
|
|
|
config.Validate(configSchema, rawConfig)
|
|
if err = json.Unmarshal(rawConfig, &cfg); err != nil {
|
|
cclog.Warn("Error while unmarshaling raw config json")
|
|
return
|
|
}
|
|
|
|
switch cfg.Kind {
|
|
case "file":
|
|
ar = &FsArchive{}
|
|
case "s3":
|
|
ar = &S3Archive{}
|
|
case "sqlite":
|
|
ar = &SqliteArchive{}
|
|
default:
|
|
err = fmt.Errorf("ARCHIVE/ARCHIVE > unkown archive backend '%s''", cfg.Kind)
|
|
}
|
|
|
|
var version uint64
|
|
version, err = ar.Init(rawConfig)
|
|
if err != nil {
|
|
cclog.Errorf("Error while initializing archiveBackend: %s", err.Error())
|
|
return
|
|
}
|
|
cclog.Infof("Load archive version %d", version)
|
|
|
|
err = initClusterConfig()
|
|
})
|
|
|
|
return err
|
|
}
|
|
|
|
// GetHandle returns the initialized archive backend instance.
|
|
// Must be called after Init().
|
|
func GetHandle() ArchiveBackend {
|
|
return ar
|
|
}
|
|
|
|
// LoadAveragesFromArchive loads average metric values for a job from the archive.
|
|
// This is a helper function that extracts average values from job statistics.
|
|
//
|
|
// Parameters:
|
|
// - job: Job to load averages for
|
|
// - metrics: List of metric names to retrieve
|
|
// - data: 2D slice where averages will be appended (one row per metric)
|
|
func LoadAveragesFromArchive(
|
|
job *schema.Job,
|
|
metrics []string,
|
|
data [][]schema.Float,
|
|
) error {
|
|
metaFile, err := ar.LoadJobMeta(job)
|
|
if err != nil {
|
|
cclog.Errorf("Error while loading job metadata from archiveBackend: %s", err.Error())
|
|
return err
|
|
}
|
|
|
|
for i, m := range metrics {
|
|
if stat, ok := metaFile.Statistics[m]; ok {
|
|
data[i] = append(data[i], schema.Float(stat.Avg))
|
|
} else {
|
|
data[i] = append(data[i], schema.NaN)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// LoadStatsFromArchive loads metric statistics for a job from the archive.
|
|
// Returns a map of metric names to their statistics (min, max, avg).
|
|
func LoadStatsFromArchive(
|
|
job *schema.Job,
|
|
metrics []string,
|
|
) (map[string]schema.MetricStatistics, error) {
|
|
data := make(map[string]schema.MetricStatistics, len(metrics))
|
|
metaFile, err := ar.LoadJobMeta(job)
|
|
if err != nil {
|
|
cclog.Errorf("Error while loading job metadata from archiveBackend: %s", err.Error())
|
|
return data, err
|
|
}
|
|
|
|
for _, m := range metrics {
|
|
stat, ok := metaFile.Statistics[m]
|
|
if !ok {
|
|
data[m] = schema.MetricStatistics{Min: 0.0, Avg: 0.0, Max: 0.0}
|
|
continue
|
|
}
|
|
|
|
data[m] = schema.MetricStatistics{
|
|
Avg: stat.Avg,
|
|
Min: stat.Min,
|
|
Max: stat.Max,
|
|
}
|
|
}
|
|
|
|
return data, nil
|
|
}
|
|
|
|
// LoadScopedStatsFromArchive loads scoped statistics for a job from the archive.
|
|
// Returns statistics organized by metric scope (node, socket, core, etc.).
|
|
func LoadScopedStatsFromArchive(
|
|
job *schema.Job,
|
|
metrics []string,
|
|
scopes []schema.MetricScope,
|
|
) (schema.ScopedJobStats, error) {
|
|
data, err := ar.LoadJobStats(job)
|
|
if err != nil {
|
|
cclog.Errorf("Error while loading job stats from archiveBackend: %s", err.Error())
|
|
return nil, err
|
|
}
|
|
|
|
return data, nil
|
|
}
|
|
|
|
// GetStatistics returns all metric statistics for a job.
|
|
// Returns a map of metric names to their job-level statistics.
|
|
func GetStatistics(job *schema.Job) (map[string]schema.JobStatistics, error) {
|
|
metaFile, err := ar.LoadJobMeta(job)
|
|
if err != nil {
|
|
cclog.Errorf("Error while loading job metadata from archiveBackend: %s", err.Error())
|
|
return nil, err
|
|
}
|
|
|
|
return metaFile.Statistics, nil
|
|
}
|
|
|
|
// UpdateMetadata updates the metadata map for an archived job.
|
|
// If the job is still running or archiving is disabled, this is a no-op.
|
|
//
|
|
// This function is safe for concurrent use (protected by mutex).
|
|
func UpdateMetadata(job *schema.Job, metadata map[string]string) error {
|
|
mutex.Lock()
|
|
defer mutex.Unlock()
|
|
|
|
if job.State == schema.JobStateRunning || !useArchive {
|
|
return nil
|
|
}
|
|
|
|
jobMeta, err := ar.LoadJobMeta(job)
|
|
if err != nil {
|
|
cclog.Errorf("Error while loading job metadata from archiveBackend: %s", err.Error())
|
|
return err
|
|
}
|
|
|
|
maps.Copy(jobMeta.MetaData, metadata)
|
|
|
|
return ar.StoreJobMeta(jobMeta)
|
|
}
|
|
|
|
// UpdateTags updates the tag list for an archived job.
|
|
// If the job is still running or archiving is disabled, this is a no-op.
|
|
//
|
|
// This function is safe for concurrent use (protected by mutex).
|
|
func UpdateTags(job *schema.Job, tags []*schema.Tag) error {
|
|
mutex.Lock()
|
|
defer mutex.Unlock()
|
|
|
|
if job.State == schema.JobStateRunning || !useArchive {
|
|
return nil
|
|
}
|
|
|
|
jobMeta, err := ar.LoadJobMeta(job)
|
|
if err != nil {
|
|
cclog.Errorf("Error while loading job metadata from archiveBackend: %s", err.Error())
|
|
return err
|
|
}
|
|
|
|
jobMeta.Tags = make([]*schema.Tag, 0)
|
|
for _, tag := range tags {
|
|
jobMeta.Tags = append(jobMeta.Tags, &schema.Tag{
|
|
Name: tag.Name,
|
|
Type: tag.Type,
|
|
Scope: tag.Scope,
|
|
})
|
|
}
|
|
|
|
return ar.StoreJobMeta(jobMeta)
|
|
}
|