Merge branch 'dev' of github.com:ClusterCockpit/cc-backend into dev

This commit is contained in:
2026-02-03 18:35:35 +01:00
2 changed files with 21 additions and 65 deletions

View File

@@ -158,8 +158,7 @@ func cleanupCheckpoints(dir string, cleanupDir string, from int64, deleteInstead
return 0, err
}
extension := Keys.Checkpoints.FileFormat
files, err := findFiles(entries, from, extension, false)
files, err := findFiles(entries, from, false)
if err != nil {
return 0, err
}

View File

@@ -415,7 +415,7 @@ func enqueueCheckpointHosts(dir string, work chan<- [2]string) error {
//
// Uses worker pool to load cluster/host combinations. Periodically triggers GC
// to prevent excessive heap growth. Returns number of files loaded and any errors.
func (m *MemoryStore) FromCheckpoint(dir string, from int64, extension string) (int, error) {
func (m *MemoryStore) FromCheckpoint(dir string, from int64) (int, error) {
var wg sync.WaitGroup
work := make(chan [2]string, Keys.NumWorkers*4)
n, errs := int32(0), int32(0)
@@ -426,7 +426,7 @@ func (m *MemoryStore) FromCheckpoint(dir string, from int64, extension string) (
defer wg.Done()
for host := range work {
lvl := m.root.findLevelOrCreate(host[:], len(m.Metrics))
nn, err := lvl.fromCheckpoint(m, filepath.Join(dir, host[0], host[1]), from, extension)
nn, err := lvl.fromCheckpoint(m, filepath.Join(dir, host[0], host[1]), from)
if err != nil {
cclog.Errorf("[METRICSTORE]> error while loading checkpoints for %s/%s: %s", host[0], host[1], err.Error())
atomic.AddInt32(&errs, 1)
@@ -465,57 +465,7 @@ func (m *MemoryStore) FromCheckpointFiles(dir string, from int64) (int, error) {
cclog.Debugf("[METRICSTORE]> %#v Directory created successfully", dir)
}
// Config read (replace with your actual config read)
fileFormat := Keys.Checkpoints.FileFormat
if fileFormat == "" {
fileFormat = "avro"
}
// Map to easily get the fallback format
oppositeFormat := map[string]string{
"json": "avro",
"avro": "json",
}
// First, attempt to load the specified format
if found, err := checkFilesWithExtension(dir, fileFormat); err != nil {
return 0, fmt.Errorf("[METRICSTORE]> error checking files with extension: %v", err)
} else if found {
cclog.Infof("[METRICSTORE]> Loading %s files because fileformat is %s", fileFormat, fileFormat)
return m.FromCheckpoint(dir, from, fileFormat)
}
// If not found, attempt the opposite format
altFormat := oppositeFormat[fileFormat]
if found, err := checkFilesWithExtension(dir, altFormat); err != nil {
return 0, fmt.Errorf("[METRICSTORE]> error checking files with extension: %v", err)
} else if found {
cclog.Infof("[METRICSTORE]> Loading %s files but fileformat is %s", altFormat, fileFormat)
return m.FromCheckpoint(dir, from, altFormat)
}
return 0, nil
}
// checkFilesWithExtension walks a directory tree to check if files with the given extension exist.
func checkFilesWithExtension(dir string, extension string) (bool, error) {
found := false
err := filepath.Walk(dir, func(path string, info os.FileInfo, err error) error {
if err != nil {
return fmt.Errorf("[METRICSTORE]> error accessing path %s: %v", path, err)
}
if !info.IsDir() && filepath.Ext(info.Name()) == "."+extension {
found = true
return nil
}
return nil
})
if err != nil {
return false, fmt.Errorf("[METRICSTORE]> error walking through directories: %s", err)
}
return found, nil
return m.FromCheckpoint(dir, from)
}
func (l *Level) loadAvroFile(m *MemoryStore, f *os.File, from int64) error {
@@ -729,7 +679,7 @@ func (l *Level) loadFile(cf *CheckpointFile, m *MemoryStore) error {
return nil
}
func (l *Level) fromCheckpoint(m *MemoryStore, dir string, from int64, extension string) (int, error) {
func (l *Level) fromCheckpoint(m *MemoryStore, dir string, from int64) (int, error) {
direntries, err := os.ReadDir(dir)
if err != nil {
if os.IsNotExist(err) {
@@ -748,33 +698,38 @@ func (l *Level) fromCheckpoint(m *MemoryStore, dir string, from int64, extension
children: make(map[string]*Level),
}
files, err := child.fromCheckpoint(m, path.Join(dir, e.Name()), from, extension)
files, err := child.fromCheckpoint(m, path.Join(dir, e.Name()), from)
filesLoaded += files
if err != nil {
return filesLoaded, err
}
l.children[e.Name()] = child
} else if strings.HasSuffix(e.Name(), "."+extension) {
} else if strings.HasSuffix(e.Name(), ".json") || strings.HasSuffix(e.Name(), ".avro") {
allFiles = append(allFiles, e)
} else {
continue
}
}
files, err := findFiles(allFiles, from, extension, true)
files, err := findFiles(allFiles, from, true)
if err != nil {
return filesLoaded, err
}
loaders := map[string]func(*MemoryStore, *os.File, int64) error{
"json": l.loadJSONFile,
"avro": l.loadAvroFile,
".json": l.loadJSONFile,
".avro": l.loadAvroFile,
}
loader := loaders[extension]
for _, filename := range files {
ext := filepath.Ext(filename)
loader := loaders[ext]
if loader == nil {
cclog.Warnf("Unknown extension for file %s", filename)
continue
}
// Use a closure to ensure file is closed immediately after use
err := func() error {
f, err := os.Open(path.Join(dir, filename))
@@ -798,10 +753,12 @@ func (l *Level) fromCheckpoint(m *MemoryStore, dir string, from int64, extension
// This will probably get very slow over time!
// A solution could be some sort of an index file in which all other files
// and the timespan they contain is listed.
func findFiles(direntries []fs.DirEntry, t int64, extension string, findMoreRecentFiles bool) ([]string, error) {
// NOTE: This now assumes that you have distinct timestamps for json and avro files
// Also, it assumes that the timestamps are not overlapping/self-modified.
func findFiles(direntries []fs.DirEntry, t int64, findMoreRecentFiles bool) ([]string, error) {
nums := map[string]int64{}
for _, e := range direntries {
if !strings.HasSuffix(e.Name(), "."+extension) {
if !strings.HasSuffix(e.Name(), ".json") && !strings.HasSuffix(e.Name(), ".avro") {
continue
}