From 415467967d6c1b1ce08848250cc2fe25dd547361 Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Thu, 19 Feb 2026 08:40:37 +0100 Subject: [PATCH] Apply optimizations to checkpoint loading --- pkg/metricstore/checkpoint.go | 412 ++++++++++++++++++++++++--------- pkg/metricstore/config.go | 1 - pkg/metricstore/metricstore.go | 17 +- 3 files changed, 310 insertions(+), 120 deletions(-) diff --git a/pkg/metricstore/checkpoint.go b/pkg/metricstore/checkpoint.go index b4097ff2..cf28e12e 100644 --- a/pkg/metricstore/checkpoint.go +++ b/pkg/metricstore/checkpoint.go @@ -56,9 +56,8 @@ import ( ) const ( - CheckpointFilePerms = 0o644 // File permissions for checkpoint files - CheckpointDirPerms = 0o755 // Directory permissions for checkpoint directories - GCTriggerInterval = DefaultGCTriggerInterval // Interval for triggering GC during checkpoint loading + CheckpointFilePerms = 0o644 // File permissions for checkpoint files + CheckpointDirPerms = 0o755 // Directory permissions for checkpoint directories ) // CheckpointMetrics represents metric data in a checkpoint file. @@ -165,6 +164,153 @@ func Checkpointing(wg *sync.WaitGroup, ctx context.Context) { } } +// UnmarshalJSON provides optimized JSON decoding for CheckpointMetrics. +// +// Mirrors the optimized MarshalJSON by manually parsing JSON to avoid +// per-element interface dispatch and allocation overhead of the generic +// json.Unmarshal path for []schema.Float. +func (cm *CheckpointMetrics) UnmarshalJSON(input []byte) error { + // Minimal manual JSON parsing for the known structure: + // {"frequency":N,"start":N,"data":[...]} + // Field order may vary, so we parse field names. + + if len(input) < 2 || input[0] != '{' { + return fmt.Errorf("expected JSON object") + } + + i := 1 // skip '{' + for i < len(input) { + // Skip whitespace + for i < len(input) && (input[i] == ' ' || input[i] == '\t' || input[i] == '\n' || input[i] == '\r') { + i++ + } + if i >= len(input) || input[i] == '}' { + break + } + if input[i] == ',' { + i++ + continue + } + + // Parse field name + if input[i] != '"' { + return fmt.Errorf("expected field name at pos %d", i) + } + i++ + nameStart := i + for i < len(input) && input[i] != '"' { + i++ + } + fieldName := string(input[nameStart:i]) + i++ // skip closing '"' + + // Skip ':' + for i < len(input) && (input[i] == ' ' || input[i] == ':') { + i++ + } + + switch fieldName { + case "frequency": + numStart := i + for i < len(input) && input[i] != ',' && input[i] != '}' { + i++ + } + v, err := strconv.ParseInt(string(input[numStart:i]), 10, 64) + if err != nil { + return fmt.Errorf("invalid frequency: %w", err) + } + cm.Frequency = v + + case "start": + numStart := i + for i < len(input) && input[i] != ',' && input[i] != '}' { + i++ + } + v, err := strconv.ParseInt(string(input[numStart:i]), 10, 64) + if err != nil { + return fmt.Errorf("invalid start: %w", err) + } + cm.Start = v + + case "data": + if input[i] != '[' { + return fmt.Errorf("expected '[' for data array at pos %d", i) + } + i++ // skip '[' + + cm.Data = make([]schema.Float, 0, 256) + for i < len(input) { + // Skip whitespace + for i < len(input) && (input[i] == ' ' || input[i] == '\t' || input[i] == '\n' || input[i] == '\r') { + i++ + } + if i >= len(input) { + break + } + if input[i] == ']' { + i++ + break + } + if input[i] == ',' { + i++ + continue + } + + // Parse value: number or null + if input[i] == 'n' { + // "null" + cm.Data = append(cm.Data, schema.NaN) + i += 4 + } else { + numStart := i + for i < len(input) && input[i] != ',' && input[i] != ']' && input[i] != ' ' { + i++ + } + v, err := strconv.ParseFloat(string(input[numStart:i]), 64) + if err != nil { + return fmt.Errorf("invalid data value: %w", err) + } + cm.Data = append(cm.Data, schema.Float(v)) + } + } + + default: + // Skip unknown field value + depth := 0 + inStr := false + for i < len(input) { + if inStr { + if input[i] == '\\' { + i++ + } else if input[i] == '"' { + inStr = false + } + } else { + switch input[i] { + case '"': + inStr = true + case '{', '[': + depth++ + case '}', ']': + if depth == 0 { + goto doneSkip + } + depth-- + case ',': + if depth == 0 { + goto doneSkip + } + } + } + i++ + } + doneSkip: + } + } + + return nil +} + // MarshalJSON provides optimized JSON encoding for CheckpointMetrics. // // Since schema.Float has custom MarshalJSON, serializing []Float has significant overhead. @@ -371,51 +517,56 @@ func (l *Level) toCheckpoint(dir string, from, to int64, m *MemoryStore) error { } // enqueueCheckpointHosts traverses checkpoint directory and enqueues cluster/host pairs. -// Returns error if directory structure is invalid. -func enqueueCheckpointHosts(dir string, work chan<- [2]string) error { +// Returns the set of cluster names found and any error if directory structure is invalid. +func enqueueCheckpointHosts(dir string, work chan<- [2]string) (map[string]struct{}, error) { clustersDir, err := os.ReadDir(dir) if err != nil { - return err + return nil, err } - gcCounter := 0 + clusters := make(map[string]struct{}, len(clustersDir)) + for _, clusterDir := range clustersDir { if !clusterDir.IsDir() { - return errors.New("[METRICSTORE]> expected only directories at first level of checkpoints/ directory") + return nil, errors.New("[METRICSTORE]> expected only directories at first level of checkpoints/ directory") } + clusters[clusterDir.Name()] = struct{}{} + hostsDir, err := os.ReadDir(filepath.Join(dir, clusterDir.Name())) if err != nil { - return err + return nil, err } for _, hostDir := range hostsDir { if !hostDir.IsDir() { - return errors.New("[METRICSTORE]> expected only directories at second level of checkpoints/ directory") + return nil, errors.New("[METRICSTORE]> expected only directories at second level of checkpoints/ directory") } - gcCounter++ - // if gcCounter%GCTriggerInterval == 0 { - // Forcing garbage collection runs here regulary during the loading of checkpoints - // will decrease the total heap size after loading everything back to memory is done. - // While loading data, the heap will grow fast, so the GC target size will double - // almost always. By forcing GCs here, we can keep it growing more slowly so that - // at the end, less memory is wasted. - // runtime.GC() - // } - work <- [2]string{clusterDir.Name(), hostDir.Name()} } } - return nil + return clusters, nil } // FromCheckpoint loads checkpoint files from disk into memory in parallel. // -// Uses worker pool to load cluster/host combinations. Periodically triggers GC -// to prevent excessive heap growth. Returns number of files loaded and any errors. +// Pre-creates cluster-level entries to reduce lock contention during parallel loading. +// Uses worker pool to load cluster/host combinations. Returns number of files loaded and any errors. func (m *MemoryStore) FromCheckpoint(dir string, from int64) (int, error) { + // Pre-create cluster-level entries to eliminate write-lock contention on m.root + // during parallel loading. Workers only contend at the cluster level (independent). + clusterDirs, err := os.ReadDir(dir) + if err != nil && !os.IsNotExist(err) { + return 0, err + } + for _, d := range clusterDirs { + if d.IsDir() { + m.root.findLevelOrCreate([]string{d.Name()}, len(m.Metrics)) + } + } + var wg sync.WaitGroup work := make(chan [2]string, Keys.NumWorkers*4) n, errs := int32(0), int32(0) @@ -436,7 +587,7 @@ func (m *MemoryStore) FromCheckpoint(dir string, from int64) (int, error) { }() } - err := enqueueCheckpointHosts(dir, work) + _, err = enqueueCheckpointHosts(dir, work) close(work) wg.Wait() @@ -608,24 +759,6 @@ func (l *Level) createBuffer(m *MemoryStore, metricName string, floatArray schem return nil } -func (l *Level) loadJSONFile(m *MemoryStore, f *os.File, from int64) error { - br := bufio.NewReader(f) - cf := &CheckpointFile{} - if err := json.NewDecoder(br).Decode(cf); err != nil { - return err - } - - if cf.To != 0 && cf.To < from { - return nil - } - - if err := l.loadFile(cf, m); err != nil { - return err - } - - return nil -} - func (l *Level) loadFile(cf *CheckpointFile, m *MemoryStore) error { for name, metric := range cf.Metrics { n := len(metric.Data) @@ -689,26 +822,16 @@ func (l *Level) fromCheckpoint(m *MemoryStore, dir string, from int64) (int, err return 0, err } - allFiles := make([]fs.DirEntry, 0) + allFiles := make([]fs.DirEntry, 0, len(direntries)) filesLoaded := 0 for _, e := range direntries { if e.IsDir() { - child := &Level{ - metrics: make([]*buffer, len(m.Metrics)), - children: make(map[string]*Level), - } - - files, err := child.fromCheckpoint(m, path.Join(dir, e.Name()), from) - filesLoaded += files - if err != nil { - return filesLoaded, err - } - - l.children[e.Name()] = child + // Host-level directories should only contain files, not subdirectories. + // Skip unexpected subdirectories with a warning. + cclog.Warnf("[METRICSTORE]> unexpected subdirectory '%s' in checkpoint dir '%s', skipping", e.Name(), dir) + continue } else if strings.HasSuffix(e.Name(), ".json") || strings.HasSuffix(e.Name(), ".avro") { allFiles = append(allFiles, e) - } else { - continue } } @@ -717,20 +840,75 @@ func (l *Level) fromCheckpoint(m *MemoryStore, dir string, from int64) (int, err return filesLoaded, err } - loaders := map[string]func(*MemoryStore, *os.File, int64) error{ - ".json": l.loadJSONFile, - ".avro": l.loadAvroFile, + if len(files) == 0 { + return 0, nil } + // Separate files by type + var jsonFiles, avroFiles []string for _, filename := range files { - ext := filepath.Ext(filename) - loader := loaders[ext] - if loader == nil { - cclog.Warnf("Unknown extension for file %s", filename) - continue + switch filepath.Ext(filename) { + case ".json": + jsonFiles = append(jsonFiles, filename) + case ".avro": + avroFiles = append(avroFiles, filename) + default: + cclog.Warnf("[METRICSTORE]> unknown extension for file %s", filename) + } + } + + // Parallel JSON decoding: decode files concurrently, then apply sequentially + if len(jsonFiles) > 0 { + type decodedFile struct { + cf *CheckpointFile + err error } - // Use a closure to ensure file is closed immediately after use + decoded := make([]decodedFile, len(jsonFiles)) + var decodeWg sync.WaitGroup + + for i, filename := range jsonFiles { + decodeWg.Add(1) + go func(idx int, fname string) { + defer decodeWg.Done() + f, err := os.Open(path.Join(dir, fname)) + if err != nil { + decoded[idx] = decodedFile{err: err} + return + } + defer f.Close() + + cf := &CheckpointFile{} + if err := json.NewDecoder(bufio.NewReader(f)).Decode(cf); err != nil { + decoded[idx] = decodedFile{err: fmt.Errorf("decoding %s: %w", fname, err)} + return + } + + decoded[idx] = decodedFile{cf: cf} + }(i, filename) + } + + decodeWg.Wait() + + // Apply decoded files sequentially to maintain buffer ordering + for i, d := range decoded { + if d.err != nil { + return filesLoaded, d.err + } + + if d.cf.To != 0 && d.cf.To < from { + continue + } + + if err := l.loadFile(d.cf, m); err != nil { + return filesLoaded, fmt.Errorf("loading %s: %w", jsonFiles[i], err) + } + filesLoaded++ + } + } + + // Load Avro files sequentially (they modify Level state directly) + for _, filename := range avroFiles { err := func() error { f, err := os.Open(path.Join(dir, filename)) if err != nil { @@ -738,74 +916,84 @@ func (l *Level) fromCheckpoint(m *MemoryStore, dir string, from int64) (int, err } defer f.Close() - return loader(m, f, from) + return l.loadAvroFile(m, f, from) }() if err != nil { return filesLoaded, err } - - filesLoaded += 1 + filesLoaded++ } return filesLoaded, nil } -// This will probably get very slow over time! -// A solution could be some sort of an index file in which all other files -// and the timespan they contain is listed. -// NOTE: This now assumes that you have distinct timestamps for json and avro files -// Also, it assumes that the timestamps are not overlapping/self-modified. +// findFiles filters and sorts checkpoint files by timestamp. +// +// When findMoreRecentFiles is true, returns files with timestamp >= t (for loading), +// plus the immediately preceding file if it straddles the boundary. +// When false, returns files with timestamp <= t (for cleanup). +// +// Filters before sorting so only relevant files are sorted, keeping performance +// stable regardless of total directory size. func findFiles(direntries []fs.DirEntry, t int64, findMoreRecentFiles bool) ([]string, error) { - nums := map[string]int64{} + type fileEntry struct { + name string + ts int64 + } + + // Parse timestamps and pre-filter in a single pass + var candidates []fileEntry + var bestPreceding *fileEntry // Track the file just before the cutoff (for boundary straddling) + for _, e := range direntries { - if !strings.HasSuffix(e.Name(), ".json") && !strings.HasSuffix(e.Name(), ".avro") { + name := e.Name() + if !strings.HasSuffix(name, ".json") && !strings.HasSuffix(name, ".avro") { continue } - ts, err := strconv.ParseInt(e.Name()[strings.Index(e.Name(), "_")+1:len(e.Name())-5], 10, 64) + ts, err := strconv.ParseInt(name[strings.Index(name, "_")+1:len(name)-5], 10, 64) if err != nil { return nil, err } - nums[e.Name()] = ts - } - - sort.Slice(direntries, func(i, j int) bool { - a, b := direntries[i], direntries[j] - return nums[a.Name()] < nums[b.Name()] - }) - - if len(nums) == 0 { - return nil, nil - } - - filenames := make([]string, 0) - - for i, e := range direntries { - ts1 := nums[e.Name()] - - // Logic to look for files in forward or direction - // If logic: All files greater than or after - // the given timestamp will be selected - // Else If logic: All files less than or before - // the given timestamp will be selected - if findMoreRecentFiles && t <= ts1 { - filenames = append(filenames, e.Name()) - } else if !findMoreRecentFiles && ts1 <= t && ts1 != 0 { - filenames = append(filenames, e.Name()) - } - if i == len(direntries)-1 { - continue - } - - enext := direntries[i+1] - ts2 := nums[enext.Name()] if findMoreRecentFiles { - if ts1 < t && t < ts2 { - filenames = append(filenames, e.Name()) + if ts >= t { + candidates = append(candidates, fileEntry{name, ts}) + } else { + // Track the most recent file before the cutoff for boundary straddling + if bestPreceding == nil || ts > bestPreceding.ts { + bestPreceding = &fileEntry{name, ts} + } + } + } else { + if ts <= t && ts != 0 { + candidates = append(candidates, fileEntry{name, ts}) } } } + // Include the boundary-straddling file if we found one and there are also files after the cutoff + if findMoreRecentFiles && bestPreceding != nil && len(candidates) > 0 { + candidates = append(candidates, *bestPreceding) + } + + if len(candidates) == 0 { + // If searching for recent files and we only have a preceding file, include it + if findMoreRecentFiles && bestPreceding != nil { + return []string{bestPreceding.name}, nil + } + return nil, nil + } + + // Sort only the filtered candidates + sort.Slice(candidates, func(i, j int) bool { + return candidates[i].ts < candidates[j].ts + }) + + filenames := make([]string, len(candidates)) + for i, c := range candidates { + filenames[i] = c.name + } + return filenames, nil } diff --git a/pkg/metricstore/config.go b/pkg/metricstore/config.go index 69ee3563..5fc1506f 100644 --- a/pkg/metricstore/config.go +++ b/pkg/metricstore/config.go @@ -54,7 +54,6 @@ import ( const ( DefaultMaxWorkers = 10 DefaultBufferCapacity = 512 - DefaultGCTriggerInterval = 100 DefaultAvroWorkers = 4 DefaultCheckpointBufferMin = 3 DefaultAvroCheckpointInterval = time.Minute diff --git a/pkg/metricstore/metricstore.go b/pkg/metricstore/metricstore.go index 789c6d07..10bfae8a 100644 --- a/pkg/metricstore/metricstore.go +++ b/pkg/metricstore/metricstore.go @@ -151,6 +151,12 @@ func Init(rawConfig json.RawMessage, metrics map[string]MetricConfig, wg *sync.W restoreFrom := startupTime.Add(-d) cclog.Infof("[METRICSTORE]> Loading checkpoints newer than %s\n", restoreFrom.Format(time.RFC3339)) + + // Lower GC target during loading to prevent excessive heap growth. + // During checkpoint loading the heap grows rapidly, causing the GC to + // double its target repeatedly. A lower percentage keeps it tighter. + oldGCPercent := debug.SetGCPercent(20) + files, err := ms.FromCheckpointFiles(Keys.Checkpoints.RootDir, restoreFrom.Unix()) loadedData := ms.SizeInBytes() / 1024 / 1024 // In MB if err != nil { @@ -159,13 +165,10 @@ func Init(rawConfig json.RawMessage, metrics map[string]MetricConfig, wg *sync.W cclog.Infof("[METRICSTORE]> Checkpoints loaded (%d files, %d MB, that took %fs)\n", files, loadedData, time.Since(startupTime).Seconds()) } - // Try to use less memory by forcing a GC run here and then - // lowering the target percentage. The default of 100 means - // that only once the ratio of new allocations execeds the - // previously active heap, a GC is triggered. - // Forcing a GC here will set the "previously active heap" - // to a minumum. - // runtime.GC() + // Restore GC target and force a collection to set a tight baseline + // for the "previously active heap" size, reducing long-term memory waste. + debug.SetGCPercent(oldGCPercent) + runtime.GC() ctx, shutdown := context.WithCancel(context.Background())