Apply optimizations to checkpoint loading

This commit is contained in:
2026-02-19 08:40:37 +01:00
parent 90b52f997d
commit 415467967d
3 changed files with 310 additions and 120 deletions

View File

@@ -58,7 +58,6 @@ import (
const ( const (
CheckpointFilePerms = 0o644 // File permissions for checkpoint files CheckpointFilePerms = 0o644 // File permissions for checkpoint files
CheckpointDirPerms = 0o755 // Directory permissions for checkpoint directories CheckpointDirPerms = 0o755 // Directory permissions for checkpoint directories
GCTriggerInterval = DefaultGCTriggerInterval // Interval for triggering GC during checkpoint loading
) )
// CheckpointMetrics represents metric data in a checkpoint file. // CheckpointMetrics represents metric data in a checkpoint file.
@@ -165,6 +164,153 @@ func Checkpointing(wg *sync.WaitGroup, ctx context.Context) {
} }
} }
// UnmarshalJSON provides optimized JSON decoding for CheckpointMetrics.
//
// Mirrors the optimized MarshalJSON by manually parsing JSON to avoid
// per-element interface dispatch and allocation overhead of the generic
// json.Unmarshal path for []schema.Float.
func (cm *CheckpointMetrics) UnmarshalJSON(input []byte) error {
// Minimal manual JSON parsing for the known structure:
// {"frequency":N,"start":N,"data":[...]}
// Field order may vary, so we parse field names.
if len(input) < 2 || input[0] != '{' {
return fmt.Errorf("expected JSON object")
}
i := 1 // skip '{'
for i < len(input) {
// Skip whitespace
for i < len(input) && (input[i] == ' ' || input[i] == '\t' || input[i] == '\n' || input[i] == '\r') {
i++
}
if i >= len(input) || input[i] == '}' {
break
}
if input[i] == ',' {
i++
continue
}
// Parse field name
if input[i] != '"' {
return fmt.Errorf("expected field name at pos %d", i)
}
i++
nameStart := i
for i < len(input) && input[i] != '"' {
i++
}
fieldName := string(input[nameStart:i])
i++ // skip closing '"'
// Skip ':'
for i < len(input) && (input[i] == ' ' || input[i] == ':') {
i++
}
switch fieldName {
case "frequency":
numStart := i
for i < len(input) && input[i] != ',' && input[i] != '}' {
i++
}
v, err := strconv.ParseInt(string(input[numStart:i]), 10, 64)
if err != nil {
return fmt.Errorf("invalid frequency: %w", err)
}
cm.Frequency = v
case "start":
numStart := i
for i < len(input) && input[i] != ',' && input[i] != '}' {
i++
}
v, err := strconv.ParseInt(string(input[numStart:i]), 10, 64)
if err != nil {
return fmt.Errorf("invalid start: %w", err)
}
cm.Start = v
case "data":
if input[i] != '[' {
return fmt.Errorf("expected '[' for data array at pos %d", i)
}
i++ // skip '['
cm.Data = make([]schema.Float, 0, 256)
for i < len(input) {
// Skip whitespace
for i < len(input) && (input[i] == ' ' || input[i] == '\t' || input[i] == '\n' || input[i] == '\r') {
i++
}
if i >= len(input) {
break
}
if input[i] == ']' {
i++
break
}
if input[i] == ',' {
i++
continue
}
// Parse value: number or null
if input[i] == 'n' {
// "null"
cm.Data = append(cm.Data, schema.NaN)
i += 4
} else {
numStart := i
for i < len(input) && input[i] != ',' && input[i] != ']' && input[i] != ' ' {
i++
}
v, err := strconv.ParseFloat(string(input[numStart:i]), 64)
if err != nil {
return fmt.Errorf("invalid data value: %w", err)
}
cm.Data = append(cm.Data, schema.Float(v))
}
}
default:
// Skip unknown field value
depth := 0
inStr := false
for i < len(input) {
if inStr {
if input[i] == '\\' {
i++
} else if input[i] == '"' {
inStr = false
}
} else {
switch input[i] {
case '"':
inStr = true
case '{', '[':
depth++
case '}', ']':
if depth == 0 {
goto doneSkip
}
depth--
case ',':
if depth == 0 {
goto doneSkip
}
}
}
i++
}
doneSkip:
}
}
return nil
}
// MarshalJSON provides optimized JSON encoding for CheckpointMetrics. // MarshalJSON provides optimized JSON encoding for CheckpointMetrics.
// //
// Since schema.Float has custom MarshalJSON, serializing []Float has significant overhead. // Since schema.Float has custom MarshalJSON, serializing []Float has significant overhead.
@@ -371,51 +517,56 @@ func (l *Level) toCheckpoint(dir string, from, to int64, m *MemoryStore) error {
} }
// enqueueCheckpointHosts traverses checkpoint directory and enqueues cluster/host pairs. // enqueueCheckpointHosts traverses checkpoint directory and enqueues cluster/host pairs.
// Returns error if directory structure is invalid. // Returns the set of cluster names found and any error if directory structure is invalid.
func enqueueCheckpointHosts(dir string, work chan<- [2]string) error { func enqueueCheckpointHosts(dir string, work chan<- [2]string) (map[string]struct{}, error) {
clustersDir, err := os.ReadDir(dir) clustersDir, err := os.ReadDir(dir)
if err != nil { if err != nil {
return err return nil, err
} }
gcCounter := 0 clusters := make(map[string]struct{}, len(clustersDir))
for _, clusterDir := range clustersDir { for _, clusterDir := range clustersDir {
if !clusterDir.IsDir() { if !clusterDir.IsDir() {
return errors.New("[METRICSTORE]> expected only directories at first level of checkpoints/ directory") return nil, errors.New("[METRICSTORE]> expected only directories at first level of checkpoints/ directory")
} }
clusters[clusterDir.Name()] = struct{}{}
hostsDir, err := os.ReadDir(filepath.Join(dir, clusterDir.Name())) hostsDir, err := os.ReadDir(filepath.Join(dir, clusterDir.Name()))
if err != nil { if err != nil {
return err return nil, err
} }
for _, hostDir := range hostsDir { for _, hostDir := range hostsDir {
if !hostDir.IsDir() { if !hostDir.IsDir() {
return errors.New("[METRICSTORE]> expected only directories at second level of checkpoints/ directory") return nil, errors.New("[METRICSTORE]> expected only directories at second level of checkpoints/ directory")
} }
gcCounter++
// if gcCounter%GCTriggerInterval == 0 {
// Forcing garbage collection runs here regulary during the loading of checkpoints
// will decrease the total heap size after loading everything back to memory is done.
// While loading data, the heap will grow fast, so the GC target size will double
// almost always. By forcing GCs here, we can keep it growing more slowly so that
// at the end, less memory is wasted.
// runtime.GC()
// }
work <- [2]string{clusterDir.Name(), hostDir.Name()} work <- [2]string{clusterDir.Name(), hostDir.Name()}
} }
} }
return nil return clusters, nil
} }
// FromCheckpoint loads checkpoint files from disk into memory in parallel. // FromCheckpoint loads checkpoint files from disk into memory in parallel.
// //
// Uses worker pool to load cluster/host combinations. Periodically triggers GC // Pre-creates cluster-level entries to reduce lock contention during parallel loading.
// to prevent excessive heap growth. Returns number of files loaded and any errors. // Uses worker pool to load cluster/host combinations. Returns number of files loaded and any errors.
func (m *MemoryStore) FromCheckpoint(dir string, from int64) (int, error) { func (m *MemoryStore) FromCheckpoint(dir string, from int64) (int, error) {
// Pre-create cluster-level entries to eliminate write-lock contention on m.root
// during parallel loading. Workers only contend at the cluster level (independent).
clusterDirs, err := os.ReadDir(dir)
if err != nil && !os.IsNotExist(err) {
return 0, err
}
for _, d := range clusterDirs {
if d.IsDir() {
m.root.findLevelOrCreate([]string{d.Name()}, len(m.Metrics))
}
}
var wg sync.WaitGroup var wg sync.WaitGroup
work := make(chan [2]string, Keys.NumWorkers*4) work := make(chan [2]string, Keys.NumWorkers*4)
n, errs := int32(0), int32(0) n, errs := int32(0), int32(0)
@@ -436,7 +587,7 @@ func (m *MemoryStore) FromCheckpoint(dir string, from int64) (int, error) {
}() }()
} }
err := enqueueCheckpointHosts(dir, work) _, err = enqueueCheckpointHosts(dir, work)
close(work) close(work)
wg.Wait() wg.Wait()
@@ -608,24 +759,6 @@ func (l *Level) createBuffer(m *MemoryStore, metricName string, floatArray schem
return nil return nil
} }
func (l *Level) loadJSONFile(m *MemoryStore, f *os.File, from int64) error {
br := bufio.NewReader(f)
cf := &CheckpointFile{}
if err := json.NewDecoder(br).Decode(cf); err != nil {
return err
}
if cf.To != 0 && cf.To < from {
return nil
}
if err := l.loadFile(cf, m); err != nil {
return err
}
return nil
}
func (l *Level) loadFile(cf *CheckpointFile, m *MemoryStore) error { func (l *Level) loadFile(cf *CheckpointFile, m *MemoryStore) error {
for name, metric := range cf.Metrics { for name, metric := range cf.Metrics {
n := len(metric.Data) n := len(metric.Data)
@@ -689,26 +822,16 @@ func (l *Level) fromCheckpoint(m *MemoryStore, dir string, from int64) (int, err
return 0, err return 0, err
} }
allFiles := make([]fs.DirEntry, 0) allFiles := make([]fs.DirEntry, 0, len(direntries))
filesLoaded := 0 filesLoaded := 0
for _, e := range direntries { for _, e := range direntries {
if e.IsDir() { if e.IsDir() {
child := &Level{ // Host-level directories should only contain files, not subdirectories.
metrics: make([]*buffer, len(m.Metrics)), // Skip unexpected subdirectories with a warning.
children: make(map[string]*Level), cclog.Warnf("[METRICSTORE]> unexpected subdirectory '%s' in checkpoint dir '%s', skipping", e.Name(), dir)
} continue
files, err := child.fromCheckpoint(m, path.Join(dir, e.Name()), from)
filesLoaded += files
if err != nil {
return filesLoaded, err
}
l.children[e.Name()] = child
} else if strings.HasSuffix(e.Name(), ".json") || strings.HasSuffix(e.Name(), ".avro") { } else if strings.HasSuffix(e.Name(), ".json") || strings.HasSuffix(e.Name(), ".avro") {
allFiles = append(allFiles, e) allFiles = append(allFiles, e)
} else {
continue
} }
} }
@@ -717,20 +840,75 @@ func (l *Level) fromCheckpoint(m *MemoryStore, dir string, from int64) (int, err
return filesLoaded, err return filesLoaded, err
} }
loaders := map[string]func(*MemoryStore, *os.File, int64) error{ if len(files) == 0 {
".json": l.loadJSONFile, return 0, nil
".avro": l.loadAvroFile,
} }
// Separate files by type
var jsonFiles, avroFiles []string
for _, filename := range files { for _, filename := range files {
ext := filepath.Ext(filename) switch filepath.Ext(filename) {
loader := loaders[ext] case ".json":
if loader == nil { jsonFiles = append(jsonFiles, filename)
cclog.Warnf("Unknown extension for file %s", filename) case ".avro":
avroFiles = append(avroFiles, filename)
default:
cclog.Warnf("[METRICSTORE]> unknown extension for file %s", filename)
}
}
// Parallel JSON decoding: decode files concurrently, then apply sequentially
if len(jsonFiles) > 0 {
type decodedFile struct {
cf *CheckpointFile
err error
}
decoded := make([]decodedFile, len(jsonFiles))
var decodeWg sync.WaitGroup
for i, filename := range jsonFiles {
decodeWg.Add(1)
go func(idx int, fname string) {
defer decodeWg.Done()
f, err := os.Open(path.Join(dir, fname))
if err != nil {
decoded[idx] = decodedFile{err: err}
return
}
defer f.Close()
cf := &CheckpointFile{}
if err := json.NewDecoder(bufio.NewReader(f)).Decode(cf); err != nil {
decoded[idx] = decodedFile{err: fmt.Errorf("decoding %s: %w", fname, err)}
return
}
decoded[idx] = decodedFile{cf: cf}
}(i, filename)
}
decodeWg.Wait()
// Apply decoded files sequentially to maintain buffer ordering
for i, d := range decoded {
if d.err != nil {
return filesLoaded, d.err
}
if d.cf.To != 0 && d.cf.To < from {
continue continue
} }
// Use a closure to ensure file is closed immediately after use if err := l.loadFile(d.cf, m); err != nil {
return filesLoaded, fmt.Errorf("loading %s: %w", jsonFiles[i], err)
}
filesLoaded++
}
}
// Load Avro files sequentially (they modify Level state directly)
for _, filename := range avroFiles {
err := func() error { err := func() error {
f, err := os.Open(path.Join(dir, filename)) f, err := os.Open(path.Join(dir, filename))
if err != nil { if err != nil {
@@ -738,73 +916,83 @@ func (l *Level) fromCheckpoint(m *MemoryStore, dir string, from int64) (int, err
} }
defer f.Close() defer f.Close()
return loader(m, f, from) return l.loadAvroFile(m, f, from)
}() }()
if err != nil { if err != nil {
return filesLoaded, err return filesLoaded, err
} }
filesLoaded++
filesLoaded += 1
} }
return filesLoaded, nil return filesLoaded, nil
} }
// This will probably get very slow over time! // findFiles filters and sorts checkpoint files by timestamp.
// A solution could be some sort of an index file in which all other files //
// and the timespan they contain is listed. // When findMoreRecentFiles is true, returns files with timestamp >= t (for loading),
// NOTE: This now assumes that you have distinct timestamps for json and avro files // plus the immediately preceding file if it straddles the boundary.
// Also, it assumes that the timestamps are not overlapping/self-modified. // When false, returns files with timestamp <= t (for cleanup).
//
// Filters before sorting so only relevant files are sorted, keeping performance
// stable regardless of total directory size.
func findFiles(direntries []fs.DirEntry, t int64, findMoreRecentFiles bool) ([]string, error) { func findFiles(direntries []fs.DirEntry, t int64, findMoreRecentFiles bool) ([]string, error) {
nums := map[string]int64{} type fileEntry struct {
name string
ts int64
}
// Parse timestamps and pre-filter in a single pass
var candidates []fileEntry
var bestPreceding *fileEntry // Track the file just before the cutoff (for boundary straddling)
for _, e := range direntries { for _, e := range direntries {
if !strings.HasSuffix(e.Name(), ".json") && !strings.HasSuffix(e.Name(), ".avro") { name := e.Name()
if !strings.HasSuffix(name, ".json") && !strings.HasSuffix(name, ".avro") {
continue continue
} }
ts, err := strconv.ParseInt(e.Name()[strings.Index(e.Name(), "_")+1:len(e.Name())-5], 10, 64) ts, err := strconv.ParseInt(name[strings.Index(name, "_")+1:len(name)-5], 10, 64)
if err != nil { if err != nil {
return nil, err return nil, err
} }
nums[e.Name()] = ts
if findMoreRecentFiles {
if ts >= t {
candidates = append(candidates, fileEntry{name, ts})
} else {
// Track the most recent file before the cutoff for boundary straddling
if bestPreceding == nil || ts > bestPreceding.ts {
bestPreceding = &fileEntry{name, ts}
}
}
} else {
if ts <= t && ts != 0 {
candidates = append(candidates, fileEntry{name, ts})
}
}
} }
sort.Slice(direntries, func(i, j int) bool { // Include the boundary-straddling file if we found one and there are also files after the cutoff
a, b := direntries[i], direntries[j] if findMoreRecentFiles && bestPreceding != nil && len(candidates) > 0 {
return nums[a.Name()] < nums[b.Name()] candidates = append(candidates, *bestPreceding)
}) }
if len(nums) == 0 { if len(candidates) == 0 {
// If searching for recent files and we only have a preceding file, include it
if findMoreRecentFiles && bestPreceding != nil {
return []string{bestPreceding.name}, nil
}
return nil, nil return nil, nil
} }
filenames := make([]string, 0) // Sort only the filtered candidates
sort.Slice(candidates, func(i, j int) bool {
return candidates[i].ts < candidates[j].ts
})
for i, e := range direntries { filenames := make([]string, len(candidates))
ts1 := nums[e.Name()] for i, c := range candidates {
filenames[i] = c.name
// Logic to look for files in forward or direction
// If logic: All files greater than or after
// the given timestamp will be selected
// Else If logic: All files less than or before
// the given timestamp will be selected
if findMoreRecentFiles && t <= ts1 {
filenames = append(filenames, e.Name())
} else if !findMoreRecentFiles && ts1 <= t && ts1 != 0 {
filenames = append(filenames, e.Name())
}
if i == len(direntries)-1 {
continue
}
enext := direntries[i+1]
ts2 := nums[enext.Name()]
if findMoreRecentFiles {
if ts1 < t && t < ts2 {
filenames = append(filenames, e.Name())
}
}
} }
return filenames, nil return filenames, nil

View File

@@ -54,7 +54,6 @@ import (
const ( const (
DefaultMaxWorkers = 10 DefaultMaxWorkers = 10
DefaultBufferCapacity = 512 DefaultBufferCapacity = 512
DefaultGCTriggerInterval = 100
DefaultAvroWorkers = 4 DefaultAvroWorkers = 4
DefaultCheckpointBufferMin = 3 DefaultCheckpointBufferMin = 3
DefaultAvroCheckpointInterval = time.Minute DefaultAvroCheckpointInterval = time.Minute

View File

@@ -151,6 +151,12 @@ func Init(rawConfig json.RawMessage, metrics map[string]MetricConfig, wg *sync.W
restoreFrom := startupTime.Add(-d) restoreFrom := startupTime.Add(-d)
cclog.Infof("[METRICSTORE]> Loading checkpoints newer than %s\n", restoreFrom.Format(time.RFC3339)) cclog.Infof("[METRICSTORE]> Loading checkpoints newer than %s\n", restoreFrom.Format(time.RFC3339))
// Lower GC target during loading to prevent excessive heap growth.
// During checkpoint loading the heap grows rapidly, causing the GC to
// double its target repeatedly. A lower percentage keeps it tighter.
oldGCPercent := debug.SetGCPercent(20)
files, err := ms.FromCheckpointFiles(Keys.Checkpoints.RootDir, restoreFrom.Unix()) files, err := ms.FromCheckpointFiles(Keys.Checkpoints.RootDir, restoreFrom.Unix())
loadedData := ms.SizeInBytes() / 1024 / 1024 // In MB loadedData := ms.SizeInBytes() / 1024 / 1024 // In MB
if err != nil { if err != nil {
@@ -159,13 +165,10 @@ func Init(rawConfig json.RawMessage, metrics map[string]MetricConfig, wg *sync.W
cclog.Infof("[METRICSTORE]> Checkpoints loaded (%d files, %d MB, that took %fs)\n", files, loadedData, time.Since(startupTime).Seconds()) cclog.Infof("[METRICSTORE]> Checkpoints loaded (%d files, %d MB, that took %fs)\n", files, loadedData, time.Since(startupTime).Seconds())
} }
// Try to use less memory by forcing a GC run here and then // Restore GC target and force a collection to set a tight baseline
// lowering the target percentage. The default of 100 means // for the "previously active heap" size, reducing long-term memory waste.
// that only once the ratio of new allocations execeds the debug.SetGCPercent(oldGCPercent)
// previously active heap, a GC is triggered. runtime.GC()
// Forcing a GC here will set the "previously active heap"
// to a minumum.
// runtime.GC()
ctx, shutdown := context.WithCancel(context.Background()) ctx, shutdown := context.WithCancel(context.Background())