diff --git a/pkg/metricstore/archive.go b/pkg/metricstore/archive.go index cab4c24f..784348b5 100644 --- a/pkg/metricstore/archive.go +++ b/pkg/metricstore/archive.go @@ -158,8 +158,7 @@ func cleanupCheckpoints(dir string, cleanupDir string, from int64, deleteInstead return 0, err } - extension := Keys.Checkpoints.FileFormat - files, err := findFiles(entries, from, extension, false) + files, err := findFiles(entries, from, false) if err != nil { return 0, err } diff --git a/pkg/metricstore/checkpoint.go b/pkg/metricstore/checkpoint.go index 715566e4..b4097ff2 100644 --- a/pkg/metricstore/checkpoint.go +++ b/pkg/metricstore/checkpoint.go @@ -415,7 +415,7 @@ func enqueueCheckpointHosts(dir string, work chan<- [2]string) error { // // Uses worker pool to load cluster/host combinations. Periodically triggers GC // to prevent excessive heap growth. Returns number of files loaded and any errors. -func (m *MemoryStore) FromCheckpoint(dir string, from int64, extension string) (int, error) { +func (m *MemoryStore) FromCheckpoint(dir string, from int64) (int, error) { var wg sync.WaitGroup work := make(chan [2]string, Keys.NumWorkers*4) n, errs := int32(0), int32(0) @@ -426,7 +426,7 @@ func (m *MemoryStore) FromCheckpoint(dir string, from int64, extension string) ( defer wg.Done() for host := range work { lvl := m.root.findLevelOrCreate(host[:], len(m.Metrics)) - nn, err := lvl.fromCheckpoint(m, filepath.Join(dir, host[0], host[1]), from, extension) + nn, err := lvl.fromCheckpoint(m, filepath.Join(dir, host[0], host[1]), from) if err != nil { cclog.Errorf("[METRICSTORE]> error while loading checkpoints for %s/%s: %s", host[0], host[1], err.Error()) atomic.AddInt32(&errs, 1) @@ -465,57 +465,7 @@ func (m *MemoryStore) FromCheckpointFiles(dir string, from int64) (int, error) { cclog.Debugf("[METRICSTORE]> %#v Directory created successfully", dir) } - // Config read (replace with your actual config read) - fileFormat := Keys.Checkpoints.FileFormat - if fileFormat == "" { - fileFormat = "avro" - } - - // Map to easily get the fallback format - oppositeFormat := map[string]string{ - "json": "avro", - "avro": "json", - } - - // First, attempt to load the specified format - if found, err := checkFilesWithExtension(dir, fileFormat); err != nil { - return 0, fmt.Errorf("[METRICSTORE]> error checking files with extension: %v", err) - } else if found { - cclog.Infof("[METRICSTORE]> Loading %s files because fileformat is %s", fileFormat, fileFormat) - return m.FromCheckpoint(dir, from, fileFormat) - } - - // If not found, attempt the opposite format - altFormat := oppositeFormat[fileFormat] - if found, err := checkFilesWithExtension(dir, altFormat); err != nil { - return 0, fmt.Errorf("[METRICSTORE]> error checking files with extension: %v", err) - } else if found { - cclog.Infof("[METRICSTORE]> Loading %s files but fileformat is %s", altFormat, fileFormat) - return m.FromCheckpoint(dir, from, altFormat) - } - - return 0, nil -} - -// checkFilesWithExtension walks a directory tree to check if files with the given extension exist. -func checkFilesWithExtension(dir string, extension string) (bool, error) { - found := false - - err := filepath.Walk(dir, func(path string, info os.FileInfo, err error) error { - if err != nil { - return fmt.Errorf("[METRICSTORE]> error accessing path %s: %v", path, err) - } - if !info.IsDir() && filepath.Ext(info.Name()) == "."+extension { - found = true - return nil - } - return nil - }) - if err != nil { - return false, fmt.Errorf("[METRICSTORE]> error walking through directories: %s", err) - } - - return found, nil + return m.FromCheckpoint(dir, from) } func (l *Level) loadAvroFile(m *MemoryStore, f *os.File, from int64) error { @@ -729,7 +679,7 @@ func (l *Level) loadFile(cf *CheckpointFile, m *MemoryStore) error { return nil } -func (l *Level) fromCheckpoint(m *MemoryStore, dir string, from int64, extension string) (int, error) { +func (l *Level) fromCheckpoint(m *MemoryStore, dir string, from int64) (int, error) { direntries, err := os.ReadDir(dir) if err != nil { if os.IsNotExist(err) { @@ -748,33 +698,38 @@ func (l *Level) fromCheckpoint(m *MemoryStore, dir string, from int64, extension children: make(map[string]*Level), } - files, err := child.fromCheckpoint(m, path.Join(dir, e.Name()), from, extension) + files, err := child.fromCheckpoint(m, path.Join(dir, e.Name()), from) filesLoaded += files if err != nil { return filesLoaded, err } l.children[e.Name()] = child - } else if strings.HasSuffix(e.Name(), "."+extension) { + } else if strings.HasSuffix(e.Name(), ".json") || strings.HasSuffix(e.Name(), ".avro") { allFiles = append(allFiles, e) } else { continue } } - files, err := findFiles(allFiles, from, extension, true) + files, err := findFiles(allFiles, from, true) if err != nil { return filesLoaded, err } loaders := map[string]func(*MemoryStore, *os.File, int64) error{ - "json": l.loadJSONFile, - "avro": l.loadAvroFile, + ".json": l.loadJSONFile, + ".avro": l.loadAvroFile, } - loader := loaders[extension] - for _, filename := range files { + ext := filepath.Ext(filename) + loader := loaders[ext] + if loader == nil { + cclog.Warnf("Unknown extension for file %s", filename) + continue + } + // Use a closure to ensure file is closed immediately after use err := func() error { f, err := os.Open(path.Join(dir, filename)) @@ -798,10 +753,12 @@ func (l *Level) fromCheckpoint(m *MemoryStore, dir string, from int64, extension // This will probably get very slow over time! // A solution could be some sort of an index file in which all other files // and the timespan they contain is listed. -func findFiles(direntries []fs.DirEntry, t int64, extension string, findMoreRecentFiles bool) ([]string, error) { +// NOTE: This now assumes that you have distinct timestamps for json and avro files +// Also, it assumes that the timestamps are not overlapping/self-modified. +func findFiles(direntries []fs.DirEntry, t int64, findMoreRecentFiles bool) ([]string, error) { nums := map[string]int64{} for _, e := range direntries { - if !strings.HasSuffix(e.Name(), "."+extension) { + if !strings.HasSuffix(e.Name(), ".json") && !strings.HasSuffix(e.Name(), ".avro") { continue }