Port to cc-lib. Extend legal header.

This commit is contained in:
2025-06-30 12:06:35 +02:00
parent 544fb35121
commit 639e1b9c6d
120 changed files with 1140 additions and 6410 deletions

View File

@@ -1,5 +1,5 @@
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
// All rights reserved.
// All rights reserved. This file is part of cc-backend.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package archive
@@ -21,9 +21,9 @@ import (
"time"
"github.com/ClusterCockpit/cc-backend/internal/config"
"github.com/ClusterCockpit/cc-backend/internal/util"
"github.com/ClusterCockpit/cc-backend/pkg/log"
"github.com/ClusterCockpit/cc-backend/pkg/schema"
cclog "github.com/ClusterCockpit/cc-lib/ccLogger"
"github.com/ClusterCockpit/cc-lib/schema"
"github.com/ClusterCockpit/cc-lib/util"
"github.com/santhosh-tekuri/jsonschema/v5"
)
@@ -68,7 +68,7 @@ func getPath(
func loadJobMeta(filename string) (*schema.Job, error) {
b, err := os.ReadFile(filename)
if err != nil {
log.Errorf("loadJobMeta() > open file error: %v", err)
cclog.Errorf("loadJobMeta() > open file error: %v", err)
return nil, err
}
if config.Keys.Validate {
@@ -83,7 +83,7 @@ func loadJobMeta(filename string) (*schema.Job, error) {
func loadJobData(filename string, isCompressed bool) (schema.JobData, error) {
f, err := os.Open(filename)
if err != nil {
log.Errorf("fsBackend LoadJobData()- %v", err)
cclog.Errorf("fsBackend LoadJobData()- %v", err)
return nil, err
}
defer f.Close()
@@ -91,7 +91,7 @@ func loadJobData(filename string, isCompressed bool) (schema.JobData, error) {
if isCompressed {
r, err := gzip.NewReader(f)
if err != nil {
log.Errorf(" %v", err)
cclog.Errorf(" %v", err)
return nil, err
}
defer r.Close()
@@ -116,7 +116,7 @@ func loadJobData(filename string, isCompressed bool) (schema.JobData, error) {
func loadJobStats(filename string, isCompressed bool) (schema.ScopedJobStats, error) {
f, err := os.Open(filename)
if err != nil {
log.Errorf("fsBackend LoadJobStats()- %v", err)
cclog.Errorf("fsBackend LoadJobStats()- %v", err)
return nil, err
}
defer f.Close()
@@ -124,7 +124,7 @@ func loadJobStats(filename string, isCompressed bool) (schema.ScopedJobStats, er
if isCompressed {
r, err := gzip.NewReader(f)
if err != nil {
log.Errorf(" %v", err)
cclog.Errorf(" %v", err)
return nil, err
}
defer r.Close()
@@ -149,25 +149,25 @@ func loadJobStats(filename string, isCompressed bool) (schema.ScopedJobStats, er
func (fsa *FsArchive) Init(rawConfig json.RawMessage) (uint64, error) {
var config FsArchiveConfig
if err := json.Unmarshal(rawConfig, &config); err != nil {
log.Warnf("Init() > Unmarshal error: %#v", err)
cclog.Warnf("Init() > Unmarshal error: %#v", err)
return 0, err
}
if config.Path == "" {
err := fmt.Errorf("Init() : empty config.Path")
log.Errorf("Init() > config.Path error: %v", err)
cclog.Errorf("Init() > config.Path error: %v", err)
return 0, err
}
fsa.path = config.Path
b, err := os.ReadFile(filepath.Join(fsa.path, "version.txt"))
if err != nil {
log.Warnf("fsBackend Init() - %v", err)
cclog.Warnf("fsBackend Init() - %v", err)
return 0, err
}
version, err := strconv.ParseUint(strings.TrimSuffix(string(b), "\n"), 10, 64)
if err != nil {
log.Errorf("fsBackend Init()- %v", err)
cclog.Errorf("fsBackend Init()- %v", err)
return 0, err
}
@@ -177,7 +177,7 @@ func (fsa *FsArchive) Init(rawConfig json.RawMessage) (uint64, error) {
entries, err := os.ReadDir(fsa.path)
if err != nil {
log.Errorf("Init() > ReadDir() error: %v", err)
cclog.Errorf("Init() > ReadDir() error: %v", err)
return 0, err
}
@@ -195,7 +195,7 @@ func (fsa *FsArchive) Info() {
fmt.Printf("Job archive %s\n", fsa.path)
clusters, err := os.ReadDir(fsa.path)
if err != nil {
log.Fatalf("Reading clusters failed: %s", err.Error())
cclog.Fatalf("Reading clusters failed: %s", err.Error())
}
ci := make(map[string]*clusterInfo)
@@ -209,7 +209,7 @@ func (fsa *FsArchive) Info() {
ci[cc] = &clusterInfo{dateFirst: time.Now().Unix()}
lvl1Dirs, err := os.ReadDir(filepath.Join(fsa.path, cluster.Name()))
if err != nil {
log.Fatalf("Reading jobs failed @ lvl1 dirs: %s", err.Error())
cclog.Fatalf("Reading jobs failed @ lvl1 dirs: %s", err.Error())
}
for _, lvl1Dir := range lvl1Dirs {
@@ -218,14 +218,14 @@ func (fsa *FsArchive) Info() {
}
lvl2Dirs, err := os.ReadDir(filepath.Join(fsa.path, cluster.Name(), lvl1Dir.Name()))
if err != nil {
log.Fatalf("Reading jobs failed @ lvl2 dirs: %s", err.Error())
cclog.Fatalf("Reading jobs failed @ lvl2 dirs: %s", err.Error())
}
for _, lvl2Dir := range lvl2Dirs {
dirpath := filepath.Join(fsa.path, cluster.Name(), lvl1Dir.Name(), lvl2Dir.Name())
startTimeDirs, err := os.ReadDir(dirpath)
if err != nil {
log.Fatalf("Reading jobs failed @ starttime dirs: %s", err.Error())
cclog.Fatalf("Reading jobs failed @ starttime dirs: %s", err.Error())
}
for _, startTimeDir := range startTimeDirs {
@@ -233,7 +233,7 @@ func (fsa *FsArchive) Info() {
ci[cc].numJobs++
startTime, err := strconv.ParseInt(startTimeDir.Name(), 10, 64)
if err != nil {
log.Fatalf("Cannot parse starttime: %s", err.Error())
cclog.Fatalf("Cannot parse starttime: %s", err.Error())
}
ci[cc].dateFirst = util.Min(ci[cc].dateFirst, startTime)
ci[cc].dateLast = util.Max(ci[cc].dateLast, startTime)
@@ -278,7 +278,7 @@ func (fsa *FsArchive) Clean(before int64, after int64) {
clusters, err := os.ReadDir(fsa.path)
if err != nil {
log.Fatalf("Reading clusters failed: %s", err.Error())
cclog.Fatalf("Reading clusters failed: %s", err.Error())
}
for _, cluster := range clusters {
@@ -288,7 +288,7 @@ func (fsa *FsArchive) Clean(before int64, after int64) {
lvl1Dirs, err := os.ReadDir(filepath.Join(fsa.path, cluster.Name()))
if err != nil {
log.Fatalf("Reading jobs failed @ lvl1 dirs: %s", err.Error())
cclog.Fatalf("Reading jobs failed @ lvl1 dirs: %s", err.Error())
}
for _, lvl1Dir := range lvl1Dirs {
@@ -297,33 +297,33 @@ func (fsa *FsArchive) Clean(before int64, after int64) {
}
lvl2Dirs, err := os.ReadDir(filepath.Join(fsa.path, cluster.Name(), lvl1Dir.Name()))
if err != nil {
log.Fatalf("Reading jobs failed @ lvl2 dirs: %s", err.Error())
cclog.Fatalf("Reading jobs failed @ lvl2 dirs: %s", err.Error())
}
for _, lvl2Dir := range lvl2Dirs {
dirpath := filepath.Join(fsa.path, cluster.Name(), lvl1Dir.Name(), lvl2Dir.Name())
startTimeDirs, err := os.ReadDir(dirpath)
if err != nil {
log.Fatalf("Reading jobs failed @ starttime dirs: %s", err.Error())
cclog.Fatalf("Reading jobs failed @ starttime dirs: %s", err.Error())
}
for _, startTimeDir := range startTimeDirs {
if startTimeDir.IsDir() {
startTime, err := strconv.ParseInt(startTimeDir.Name(), 10, 64)
if err != nil {
log.Fatalf("Cannot parse starttime: %s", err.Error())
cclog.Fatalf("Cannot parse starttime: %s", err.Error())
}
if startTime < before || startTime > after {
if err := os.RemoveAll(filepath.Join(dirpath, startTimeDir.Name())); err != nil {
log.Errorf("JobArchive Cleanup() error: %v", err)
cclog.Errorf("JobArchive Cleanup() error: %v", err)
}
}
}
}
if util.GetFilecount(dirpath) == 0 {
if err := os.Remove(dirpath); err != nil {
log.Errorf("JobArchive Clean() error: %v", err)
cclog.Errorf("JobArchive Clean() error: %v", err)
}
}
}
@@ -337,16 +337,16 @@ func (fsa *FsArchive) Move(jobs []*schema.Job, path string) {
target := getDirectory(job, path)
if err := os.MkdirAll(filepath.Clean(filepath.Join(target, "..")), 0777); err != nil {
log.Errorf("JobArchive Move MkDir error: %v", err)
cclog.Errorf("JobArchive Move MkDir error: %v", err)
}
if err := os.Rename(source, target); err != nil {
log.Errorf("JobArchive Move() error: %v", err)
cclog.Errorf("JobArchive Move() error: %v", err)
}
parent := filepath.Clean(filepath.Join(source, ".."))
if util.GetFilecount(parent) == 0 {
if err := os.Remove(parent); err != nil {
log.Errorf("JobArchive Move() error: %v", err)
cclog.Errorf("JobArchive Move() error: %v", err)
}
}
}
@@ -357,18 +357,18 @@ func (fsa *FsArchive) CleanUp(jobs []*schema.Job) {
for _, job := range jobs {
dir := getDirectory(job, fsa.path)
if err := os.RemoveAll(dir); err != nil {
log.Errorf("JobArchive Cleanup() error: %v", err)
cclog.Errorf("JobArchive Cleanup() error: %v", err)
}
parent := filepath.Clean(filepath.Join(dir, ".."))
if util.GetFilecount(parent) == 0 {
if err := os.Remove(parent); err != nil {
log.Errorf("JobArchive Cleanup() error: %v", err)
cclog.Errorf("JobArchive Cleanup() error: %v", err)
}
}
}
log.Infof("Retention Service - Remove %d files in %s", len(jobs), time.Since(start))
cclog.Infof("Retention Service - Remove %d files in %s", len(jobs), time.Since(start))
}
func (fsa *FsArchive) Compress(jobs []*schema.Job) {
@@ -383,24 +383,24 @@ func (fsa *FsArchive) Compress(jobs []*schema.Job) {
}
}
log.Infof("Compression Service - %d files took %s", cnt, time.Since(start))
cclog.Infof("Compression Service - %d files took %s", cnt, time.Since(start))
}
func (fsa *FsArchive) CompressLast(starttime int64) int64 {
filename := filepath.Join(fsa.path, "compress.txt")
b, err := os.ReadFile(filename)
if err != nil {
log.Errorf("fsBackend Compress - %v", err)
cclog.Errorf("fsBackend Compress - %v", err)
os.WriteFile(filename, []byte(fmt.Sprintf("%d", starttime)), 0644)
return starttime
}
last, err := strconv.ParseInt(strings.TrimSuffix(string(b), "\n"), 10, 64)
if err != nil {
log.Errorf("fsBackend Compress - %v", err)
cclog.Errorf("fsBackend Compress - %v", err)
return starttime
}
log.Infof("fsBackend Compress - start %d last %d", starttime, last)
cclog.Infof("fsBackend Compress - start %d last %d", starttime, last)
os.WriteFile(filename, []byte(fmt.Sprintf("%d", starttime)), 0644)
return last
}
@@ -437,10 +437,10 @@ func (fsa *FsArchive) LoadJobMeta(job *schema.Job) (*schema.Job, error) {
func (fsa *FsArchive) LoadClusterCfg(name string) (*schema.Cluster, error) {
b, err := os.ReadFile(filepath.Join(fsa.path, name, "cluster.json"))
if err != nil {
log.Errorf("LoadClusterCfg() > open file error: %v", err)
cclog.Errorf("LoadClusterCfg() > open file error: %v", err)
// if config.Keys.Validate {
if err := schema.Validate(schema.ClusterCfg, bytes.NewReader(b)); err != nil {
log.Warnf("Validate cluster config: %v\n", err)
cclog.Warnf("Validate cluster config: %v\n", err)
return &schema.Cluster{}, fmt.Errorf("validate cluster config: %v", err)
}
}
@@ -453,7 +453,7 @@ func (fsa *FsArchive) Iter(loadMetricData bool) <-chan JobContainer {
go func() {
clustersDir, err := os.ReadDir(fsa.path)
if err != nil {
log.Fatalf("Reading clusters failed @ cluster dirs: %s", err.Error())
cclog.Fatalf("Reading clusters failed @ cluster dirs: %s", err.Error())
}
for _, clusterDir := range clustersDir {
@@ -462,7 +462,7 @@ func (fsa *FsArchive) Iter(loadMetricData bool) <-chan JobContainer {
}
lvl1Dirs, err := os.ReadDir(filepath.Join(fsa.path, clusterDir.Name()))
if err != nil {
log.Fatalf("Reading jobs failed @ lvl1 dirs: %s", err.Error())
cclog.Fatalf("Reading jobs failed @ lvl1 dirs: %s", err.Error())
}
for _, lvl1Dir := range lvl1Dirs {
@@ -473,21 +473,21 @@ func (fsa *FsArchive) Iter(loadMetricData bool) <-chan JobContainer {
lvl2Dirs, err := os.ReadDir(filepath.Join(fsa.path, clusterDir.Name(), lvl1Dir.Name()))
if err != nil {
log.Fatalf("Reading jobs failed @ lvl2 dirs: %s", err.Error())
cclog.Fatalf("Reading jobs failed @ lvl2 dirs: %s", err.Error())
}
for _, lvl2Dir := range lvl2Dirs {
dirpath := filepath.Join(fsa.path, clusterDir.Name(), lvl1Dir.Name(), lvl2Dir.Name())
startTimeDirs, err := os.ReadDir(dirpath)
if err != nil {
log.Fatalf("Reading jobs failed @ starttime dirs: %s", err.Error())
cclog.Fatalf("Reading jobs failed @ starttime dirs: %s", err.Error())
}
for _, startTimeDir := range startTimeDirs {
if startTimeDir.IsDir() {
job, err := loadJobMeta(filepath.Join(dirpath, startTimeDir.Name(), "meta.json"))
if err != nil && !errors.Is(err, &jsonschema.ValidationError{}) {
log.Errorf("in %s: %s", filepath.Join(dirpath, startTimeDir.Name()), err.Error())
cclog.Errorf("in %s: %s", filepath.Join(dirpath, startTimeDir.Name()), err.Error())
}
if loadMetricData {
@@ -501,10 +501,10 @@ func (fsa *FsArchive) Iter(loadMetricData bool) <-chan JobContainer {
data, err := loadJobData(filename, isCompressed)
if err != nil && !errors.Is(err, &jsonschema.ValidationError{}) {
log.Errorf("in %s: %s", filepath.Join(dirpath, startTimeDir.Name()), err.Error())
cclog.Errorf("in %s: %s", filepath.Join(dirpath, startTimeDir.Name()), err.Error())
}
ch <- JobContainer{Meta: job, Data: &data}
log.Errorf("in %s: %s", filepath.Join(dirpath, startTimeDir.Name()), err.Error())
cclog.Errorf("in %s: %s", filepath.Join(dirpath, startTimeDir.Name()), err.Error())
} else {
ch <- JobContainer{Meta: job, Data: nil}
}
@@ -521,15 +521,15 @@ func (fsa *FsArchive) Iter(loadMetricData bool) <-chan JobContainer {
func (fsa *FsArchive) StoreJobMeta(job *schema.Job) error {
f, err := os.Create(getPath(job, fsa.path, "meta.json"))
if err != nil {
log.Error("Error while creating filepath for meta.json")
cclog.Error("Error while creating filepath for meta.json")
return err
}
if err := EncodeJobMeta(f, job); err != nil {
log.Error("Error while encoding job metadata to meta.json file")
cclog.Error("Error while encoding job metadata to meta.json file")
return err
}
if err := f.Close(); err != nil {
log.Warn("Error while closing meta.json file")
cclog.Warn("Error while closing meta.json file")
return err
}
@@ -546,35 +546,35 @@ func (fsa *FsArchive) ImportJob(
) error {
dir := getPath(jobMeta, fsa.path, "")
if err := os.MkdirAll(dir, 0777); err != nil {
log.Error("Error while creating job archive path")
cclog.Error("Error while creating job archive path")
return err
}
f, err := os.Create(path.Join(dir, "meta.json"))
if err != nil {
log.Error("Error while creating filepath for meta.json")
cclog.Error("Error while creating filepath for meta.json")
return err
}
if err := EncodeJobMeta(f, jobMeta); err != nil {
log.Error("Error while encoding job metadata to meta.json file")
cclog.Error("Error while encoding job metadata to meta.json file")
return err
}
if err := f.Close(); err != nil {
log.Warn("Error while closing meta.json file")
cclog.Warn("Error while closing meta.json file")
return err
}
f, err = os.Create(path.Join(dir, "data.json"))
if err != nil {
log.Error("Error while creating filepath for data.json")
cclog.Error("Error while creating filepath for data.json")
return err
}
if err := EncodeJobData(f, jobData); err != nil {
log.Error("Error while encoding job metricdata to data.json file")
cclog.Error("Error while encoding job metricdata to data.json file")
return err
}
if err := f.Close(); err != nil {
log.Warn("Error while closing data.json file")
cclog.Warn("Error while closing data.json file")
}
return err
}