2024-04-11 23:04:30 +02:00
|
|
|
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
|
2022-09-05 17:46:38 +02:00
|
|
|
// All rights reserved.
|
|
|
|
// Use of this source code is governed by a MIT-style
|
|
|
|
// license that can be found in the LICENSE file.
|
|
|
|
package archive
|
|
|
|
|
|
|
|
import (
|
|
|
|
"encoding/json"
|
|
|
|
"fmt"
|
|
|
|
|
2023-03-27 13:24:06 +02:00
|
|
|
"github.com/ClusterCockpit/cc-backend/pkg/log"
|
2022-09-13 07:37:48 +02:00
|
|
|
"github.com/ClusterCockpit/cc-backend/pkg/lrucache"
|
2022-09-05 17:46:38 +02:00
|
|
|
"github.com/ClusterCockpit/cc-backend/pkg/schema"
|
|
|
|
)
|
|
|
|
|
2023-04-12 10:43:46 +02:00
|
|
|
const Version uint64 = 1
|
2023-03-27 13:24:06 +02:00
|
|
|
|
2022-09-05 17:46:38 +02:00
|
|
|
type ArchiveBackend interface {
|
2023-04-12 10:43:46 +02:00
|
|
|
Init(rawConfig json.RawMessage) (uint64, error)
|
2022-09-05 17:46:38 +02:00
|
|
|
|
2023-05-15 14:32:23 +02:00
|
|
|
Info()
|
|
|
|
|
2023-05-11 16:17:17 +02:00
|
|
|
Exists(job *schema.Job) bool
|
|
|
|
|
2022-09-13 07:37:48 +02:00
|
|
|
LoadJobMeta(job *schema.Job) (*schema.JobMeta, error)
|
2022-09-05 17:46:38 +02:00
|
|
|
|
|
|
|
LoadJobData(job *schema.Job) (schema.JobData, error)
|
|
|
|
|
2022-09-13 07:37:48 +02:00
|
|
|
LoadClusterCfg(name string) (*schema.Cluster, error)
|
2022-09-05 17:46:38 +02:00
|
|
|
|
2022-09-13 07:37:48 +02:00
|
|
|
StoreJobMeta(jobMeta *schema.JobMeta) error
|
2022-09-05 17:46:38 +02:00
|
|
|
|
2022-09-13 07:37:48 +02:00
|
|
|
ImportJob(jobMeta *schema.JobMeta, jobData *schema.JobData) error
|
2022-09-05 17:46:38 +02:00
|
|
|
|
2022-09-06 08:57:38 +02:00
|
|
|
GetClusters() []string
|
|
|
|
|
2023-05-09 16:33:26 +02:00
|
|
|
CleanUp(jobs []*schema.Job)
|
2023-04-18 07:43:21 +02:00
|
|
|
|
2023-05-15 16:57:31 +02:00
|
|
|
Move(jobs []*schema.Job, path string)
|
|
|
|
|
2023-05-15 14:32:23 +02:00
|
|
|
Clean(before int64, after int64)
|
|
|
|
|
2023-05-09 16:33:26 +02:00
|
|
|
Compress(jobs []*schema.Job)
|
2023-05-09 09:34:03 +02:00
|
|
|
|
2023-06-10 07:49:02 +02:00
|
|
|
CompressLast(starttime int64) int64
|
|
|
|
|
2023-03-31 15:28:35 +02:00
|
|
|
Iter(loadMetricData bool) <-chan JobContainer
|
|
|
|
}
|
|
|
|
|
|
|
|
type JobContainer struct {
|
|
|
|
Meta *schema.JobMeta
|
|
|
|
Data *schema.JobData
|
2022-09-05 17:46:38 +02:00
|
|
|
}
|
|
|
|
|
2024-03-08 08:51:05 +01:00
|
|
|
var (
|
|
|
|
cache *lrucache.Cache = lrucache.New(128 * 1024 * 1024)
|
|
|
|
ar ArchiveBackend
|
|
|
|
useArchive bool
|
|
|
|
)
|
2022-09-05 17:46:38 +02:00
|
|
|
|
2022-11-08 16:49:45 +01:00
|
|
|
func Init(rawConfig json.RawMessage, disableArchive bool) error {
|
|
|
|
useArchive = !disableArchive
|
2023-05-09 09:34:03 +02:00
|
|
|
|
|
|
|
var cfg struct {
|
2022-09-06 08:57:38 +02:00
|
|
|
Kind string `json:"kind"`
|
|
|
|
}
|
2023-05-09 09:34:03 +02:00
|
|
|
|
|
|
|
if err := json.Unmarshal(rawConfig, &cfg); err != nil {
|
2023-02-01 11:58:27 +01:00
|
|
|
log.Warn("Error while unmarshaling raw config json")
|
2022-09-06 08:57:38 +02:00
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2023-05-09 09:34:03 +02:00
|
|
|
switch cfg.Kind {
|
2022-09-06 08:57:38 +02:00
|
|
|
case "file":
|
|
|
|
ar = &FsArchive{}
|
2022-09-05 17:46:38 +02:00
|
|
|
// case "s3":
|
|
|
|
// ar = &S3Archive{}
|
2022-09-06 08:57:38 +02:00
|
|
|
default:
|
2023-05-09 09:34:03 +02:00
|
|
|
return fmt.Errorf("ARCHIVE/ARCHIVE > unkown archive backend '%s''", cfg.Kind)
|
2022-09-06 08:57:38 +02:00
|
|
|
}
|
2022-09-05 17:46:38 +02:00
|
|
|
|
2023-03-27 13:24:06 +02:00
|
|
|
version, err := ar.Init(rawConfig)
|
|
|
|
if err != nil {
|
2023-01-31 18:28:44 +01:00
|
|
|
log.Error("Error while initializing archiveBackend")
|
2022-09-06 08:57:38 +02:00
|
|
|
return err
|
2022-09-05 17:46:38 +02:00
|
|
|
}
|
2023-03-27 13:24:06 +02:00
|
|
|
log.Infof("Load archive version %d", version)
|
2023-05-09 09:34:03 +02:00
|
|
|
|
2022-09-05 17:46:38 +02:00
|
|
|
return initClusterConfig()
|
|
|
|
}
|
|
|
|
|
|
|
|
func GetHandle() ArchiveBackend {
|
|
|
|
return ar
|
|
|
|
}
|
|
|
|
|
|
|
|
// Helper to metricdata.LoadAverages().
|
2022-09-07 12:24:45 +02:00
|
|
|
func LoadAveragesFromArchive(
|
|
|
|
job *schema.Job,
|
|
|
|
metrics []string,
|
2024-03-08 08:51:05 +01:00
|
|
|
data [][]schema.Float,
|
|
|
|
) error {
|
2022-09-05 17:46:38 +02:00
|
|
|
metaFile, err := ar.LoadJobMeta(job)
|
|
|
|
if err != nil {
|
2023-02-01 11:58:27 +01:00
|
|
|
log.Warn("Error while loading job metadata from archiveBackend")
|
2022-09-05 17:46:38 +02:00
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
for i, m := range metrics {
|
|
|
|
if stat, ok := metaFile.Statistics[m]; ok {
|
|
|
|
data[i] = append(data[i], schema.Float(stat.Avg))
|
|
|
|
} else {
|
|
|
|
data[i] = append(data[i], schema.NaN)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func GetStatistics(job *schema.Job) (map[string]schema.JobStatistics, error) {
|
|
|
|
metaFile, err := ar.LoadJobMeta(job)
|
|
|
|
if err != nil {
|
2023-02-01 11:58:27 +01:00
|
|
|
log.Warn("Error while loading job metadata from archiveBackend")
|
2022-09-05 17:46:38 +02:00
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
return metaFile.Statistics, nil
|
|
|
|
}
|
|
|
|
|
2024-03-08 08:51:05 +01:00
|
|
|
// If the job is archived, find its `meta.json` file and override the Metadata
|
|
|
|
// in that JSON file. If the job is not archived, nothing is done.
|
|
|
|
func UpdateMetadata(job *schema.Job, metadata map[string]string) error {
|
|
|
|
if job.State == schema.JobStateRunning || !useArchive {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
jobMeta, err := ar.LoadJobMeta(job)
|
|
|
|
if err != nil {
|
|
|
|
log.Warn("Error while loading job metadata from archiveBackend")
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
for k, v := range metadata {
|
|
|
|
jobMeta.MetaData[k] = v
|
|
|
|
}
|
|
|
|
|
|
|
|
return ar.StoreJobMeta(jobMeta)
|
|
|
|
}
|
|
|
|
|
2022-09-05 17:46:38 +02:00
|
|
|
// If the job is archived, find its `meta.json` file and override the tags list
|
|
|
|
// in that JSON file. If the job is not archived, nothing is done.
|
|
|
|
func UpdateTags(job *schema.Job, tags []*schema.Tag) error {
|
2022-11-08 16:49:45 +01:00
|
|
|
if job.State == schema.JobStateRunning || !useArchive {
|
2022-09-05 17:46:38 +02:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
jobMeta, err := ar.LoadJobMeta(job)
|
|
|
|
if err != nil {
|
2023-02-01 11:58:27 +01:00
|
|
|
log.Warn("Error while loading job metadata from archiveBackend")
|
2022-09-05 17:46:38 +02:00
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
jobMeta.Tags = make([]*schema.Tag, 0)
|
|
|
|
for _, tag := range tags {
|
|
|
|
jobMeta.Tags = append(jobMeta.Tags, &schema.Tag{
|
|
|
|
Name: tag.Name,
|
|
|
|
Type: tag.Type,
|
|
|
|
})
|
|
|
|
}
|
|
|
|
|
2022-09-13 07:37:48 +02:00
|
|
|
return ar.StoreJobMeta(jobMeta)
|
2022-09-05 17:46:38 +02:00
|
|
|
}
|