diff --git a/internal/memorystore/archive.go b/internal/memorystore/archive.go index 6483e74..1878952 100644 --- a/internal/memorystore/archive.go +++ b/internal/memorystore/archive.go @@ -75,10 +75,10 @@ func ArchiveCheckpoints(checkpointsDir, archiveDir string, from int64, deleteIns var wg sync.WaitGroup n, errs := int32(0), int32(0) - work := make(chan workItem, NumWorkers) + work := make(chan workItem, Keys.NumWorkers) - wg.Add(NumWorkers) - for worker := 0; worker < NumWorkers; worker++ { + wg.Add(Keys.NumWorkers) + for worker := 0; worker < Keys.NumWorkers; worker++ { go func() { defer wg.Done() for workItem := range work { @@ -116,7 +116,7 @@ func ArchiveCheckpoints(checkpointsDir, archiveDir string, from int64, deleteIns } if errs > 0 { - return int(n), fmt.Errorf("%d errors happend while archiving (%d successes)", errs, n) + return int(n), fmt.Errorf("%d errors happened while archiving (%d successes)", errs, n) } return int(n), nil } @@ -147,11 +147,11 @@ func archiveCheckpoints(dir string, archiveDir string, from int64, deleteInstead } filename := filepath.Join(archiveDir, fmt.Sprintf("%d.zip", from)) - f, err := os.OpenFile(filename, os.O_CREATE|os.O_WRONLY, 0o644) + f, err := os.OpenFile(filename, os.O_CREATE|os.O_WRONLY, CheckpointFilePerms) if err != nil && os.IsNotExist(err) { - err = os.MkdirAll(archiveDir, 0o755) + err = os.MkdirAll(archiveDir, CheckpointDirPerms) if err == nil { - f, err = os.OpenFile(filename, os.O_CREATE|os.O_WRONLY, 0o644) + f, err = os.OpenFile(filename, os.O_CREATE|os.O_WRONLY, CheckpointFilePerms) } } if err != nil { diff --git a/internal/memorystore/buffer.go b/internal/memorystore/buffer.go index a942d0a..cd2fd8f 100644 --- a/internal/memorystore/buffer.go +++ b/internal/memorystore/buffer.go @@ -105,46 +105,6 @@ func (b *buffer) firstWrite() int64 { func (b *buffer) close() {} -/* -func (b *buffer) close() { - if b.closed { - return - } - - b.closed = true - n, sum, min, max := 0, 0., math.MaxFloat64, -math.MaxFloat64 - for _, x := range b.data { - if x.IsNaN() { - continue - } - - n += 1 - f := float64(x) - sum += f - min = math.Min(min, f) - max = math.Max(max, f) - } - - b.statisticts.samples = n - if n > 0 { - b.statisticts.avg = Float(sum / float64(n)) - b.statisticts.min = Float(min) - b.statisticts.max = Float(max) - } else { - b.statisticts.avg = NaN - b.statisticts.min = NaN - b.statisticts.max = NaN - } -} -*/ - -// func interpolate(idx int, data []Float) Float { -// if idx == 0 || idx+1 == len(data) { -// return NaN -// } -// return (data[idx-1] + data[idx+1]) / 2.0 -// } - // Return all known values from `from` to `to`. Gaps of information are represented as NaN. // Simple linear interpolation is done between the two neighboring cells if possible. // If values at the start or end are missing, instead of NaN values, the second and thrid diff --git a/internal/memorystore/checkpoint.go b/internal/memorystore/checkpoint.go index fe09b9e..8b1705c 100644 --- a/internal/memorystore/checkpoint.go +++ b/internal/memorystore/checkpoint.go @@ -28,6 +28,17 @@ import ( "github.com/linkedin/goavro/v2" ) +// File operation constants +const ( + // CheckpointFilePerms defines default permissions for checkpoint files + CheckpointFilePerms = 0o644 + // CheckpointDirPerms defines default permissions for checkpoint directories + CheckpointDirPerms = 0o755 + // GCTriggerInterval determines how often GC is forced during checkpoint loading + // GC is triggered every GCTriggerInterval*NumWorkers loaded hosts + GCTriggerInterval = 100 +) + // Whenever changed, update MarshalJSON as well! type CheckpointMetrics struct { Data []schema.Float `json:"data"` @@ -171,9 +182,9 @@ func (m *MemoryStore) ToCheckpoint(dir string, from, to int64) (int, error) { n, errs := int32(0), int32(0) var wg sync.WaitGroup - wg.Add(NumWorkers) - work := make(chan workItem, NumWorkers*2) - for worker := 0; worker < NumWorkers; worker++ { + wg.Add(Keys.NumWorkers) + work := make(chan workItem, Keys.NumWorkers*2) + for worker := 0; worker < Keys.NumWorkers; worker++ { go func() { defer wg.Done() @@ -205,7 +216,7 @@ func (m *MemoryStore) ToCheckpoint(dir string, from, to int64) (int, error) { wg.Wait() if errs > 0 { - return int(n), fmt.Errorf("[METRICSTORE]> %d errors happend while creating checkpoints (%d successes)", errs, n) + return int(n), fmt.Errorf("[METRICSTORE]> %d errors happened while creating checkpoints (%d successes)", errs, n) } return int(n), nil } @@ -285,11 +296,11 @@ func (l *Level) toCheckpoint(dir string, from, to int64, m *MemoryStore) error { } filepath := path.Join(dir, fmt.Sprintf("%d.json", from)) - f, err := os.OpenFile(filepath, os.O_CREATE|os.O_WRONLY, 0o644) + f, err := os.OpenFile(filepath, os.O_CREATE|os.O_WRONLY, CheckpointFilePerms) if err != nil && os.IsNotExist(err) { - err = os.MkdirAll(dir, 0o755) + err = os.MkdirAll(dir, CheckpointDirPerms) if err == nil { - f, err = os.OpenFile(filepath, os.O_CREATE|os.O_WRONLY, 0o644) + f, err = os.OpenFile(filepath, os.O_CREATE|os.O_WRONLY, CheckpointFilePerms) } } if err != nil { @@ -307,11 +318,11 @@ func (l *Level) toCheckpoint(dir string, from, to int64, m *MemoryStore) error { func (m *MemoryStore) FromCheckpoint(dir string, from int64, extension string) (int, error) { var wg sync.WaitGroup - work := make(chan [2]string, NumWorkers) + work := make(chan [2]string, Keys.NumWorkers) n, errs := int32(0), int32(0) - wg.Add(NumWorkers) - for worker := 0; worker < NumWorkers; worker++ { + wg.Add(Keys.NumWorkers) + for worker := 0; worker < Keys.NumWorkers; worker++ { go func() { defer wg.Done() for host := range work { @@ -347,7 +358,7 @@ func (m *MemoryStore) FromCheckpoint(dir string, from int64, extension string) ( } i++ - if i%NumWorkers == 0 && i > 100 { + if i%Keys.NumWorkers == 0 && i > GCTriggerInterval { // Forcing garbage collection runs here regulary during the loading of checkpoints // will decrease the total heap size after loading everything back to memory is done. // While loading data, the heap will grow fast, so the GC target size will double @@ -368,7 +379,7 @@ done: } if errs > 0 { - return int(n), fmt.Errorf("[METRICSTORE]> %d errors happend while creating checkpoints (%d successes)", errs, n) + return int(n), fmt.Errorf("[METRICSTORE]> %d errors happened while creating checkpoints (%d successes)", errs, n) } return int(n), nil } @@ -379,7 +390,7 @@ done: func (m *MemoryStore) FromCheckpointFiles(dir string, from int64) (int, error) { if _, err := os.Stat(dir); os.IsNotExist(err) { // The directory does not exist, so create it using os.MkdirAll() - err := os.MkdirAll(dir, 0o755) // 0755 sets the permissions for the directory + err := os.MkdirAll(dir, CheckpointDirPerms) // CheckpointDirPerms sets the permissions for the directory if err != nil { cclog.Fatalf("[METRICSTORE]> Error creating directory: %#v\n", err) } @@ -464,7 +475,7 @@ func (l *Level) loadAvroFile(m *MemoryStore, f *os.File, from int64) error { // Create a new OCF reader from the buffered reader ocfReader, err := goavro.NewOCFReader(br) if err != nil { - panic(err) + return fmt.Errorf("[METRICSTORE]> error creating OCF reader: %w", err) } metricsData := make(map[string]schema.FloatArray) @@ -477,7 +488,7 @@ func (l *Level) loadAvroFile(m *MemoryStore, f *os.File, from int64) error { record, ok := datum.(map[string]any) if !ok { - panic("[METRICSTORE]> failed to assert datum as map[string]interface{}") + return fmt.Errorf("[METRICSTORE]> failed to assert datum as map[string]interface{}") } for key, value := range record { @@ -559,7 +570,7 @@ func (l *Level) createBuffer(m *MemoryStore, metricName string, floatArray schem l.metrics[minfo.offset] = b } else { if prev.start > b.start { - return errors.New("wooops") + return fmt.Errorf("[METRICSTORE]> buffer start time %d is before previous buffer start %d", b.start, prev.start) } b.prev = prev @@ -623,7 +634,7 @@ func (l *Level) loadFile(cf *CheckpointFile, m *MemoryStore) error { l.metrics[minfo.offset] = b } else { if prev.start > b.start { - return errors.New("wooops") + return fmt.Errorf("[METRICSTORE]> buffer start time %d is before previous buffer start %d", b.start, prev.start) } b.prev = prev @@ -700,13 +711,17 @@ func (l *Level) fromCheckpoint(m *MemoryStore, dir string, from int64, extension loader := loaders[extension] for _, filename := range files { - f, err := os.Open(path.Join(dir, filename)) - if err != nil { - return filesLoaded, err - } - defer f.Close() + // Use a closure to ensure file is closed immediately after use + err := func() error { + f, err := os.Open(path.Join(dir, filename)) + if err != nil { + return err + } + defer f.Close() - if err = loader(m, f, from); err != nil { + return loader(m, f, from) + }() + if err != nil { return filesLoaded, err } diff --git a/internal/memorystore/config.go b/internal/memorystore/config.go index 7baef97..8196ed6 100644 --- a/internal/memorystore/config.go +++ b/internal/memorystore/config.go @@ -12,6 +12,9 @@ import ( var InternalCCMSFlag bool = false type MetricStoreConfig struct { + // Number of concurrent workers for checkpoint and archive operations. + // If not set or 0, defaults to min(runtime.NumCPU()/2+1, 10) + NumWorkers int `json:"num-workers"` Checkpoints struct { FileFormat string `json:"file-format"` Interval string `json:"interval"` @@ -62,7 +65,7 @@ const ( AvgAggregation ) -func AssignAggregationStratergy(str string) (AggregationStrategy, error) { +func AssignAggregationStrategy(str string) (AggregationStrategy, error) { switch str { case "": return NoAggregation, nil diff --git a/internal/memorystore/level.go b/internal/memorystore/level.go index 54f7c4a..aaa1210 100644 --- a/internal/memorystore/level.go +++ b/internal/memorystore/level.go @@ -39,7 +39,7 @@ func (l *Level) findLevelOrCreate(selector []string, nMetrics int) *Level { // Children map needs to be created... l.lock.RUnlock() } else { - child, ok := l.children[selector[0]] + child, ok = l.children[selector[0]] l.lock.RUnlock() if ok { return child.findLevelOrCreate(selector[1:], nMetrics) diff --git a/internal/memorystore/memorystore.go b/internal/memorystore/memorystore.go index d76b83b..3e372f3 100644 --- a/internal/memorystore/memorystore.go +++ b/internal/memorystore/memorystore.go @@ -3,6 +3,20 @@ // Use of this source code is governed by a MIT-style // license that can be found in the LICENSE file. +// Package memorystore provides an efficient in-memory time-series metric storage system +// with support for hierarchical data organization, checkpointing, and archiving. +// +// The package organizes metrics in a tree structure (cluster → host → component) and +// provides concurrent read/write access to metric data with configurable aggregation strategies. +// Background goroutines handle periodic checkpointing (JSON or Avro format), archiving old data, +// and enforcing retention policies. +// +// Key features: +// - In-memory metric storage with configurable retention +// - Hierarchical data organization (selectors) +// - Concurrent checkpoint/archive workers +// - Support for sum and average aggregation +// - NATS integration for metric ingestion package memorystore import ( @@ -10,18 +24,14 @@ import ( "context" "encoding/json" "errors" - "os" - "os/signal" "runtime" "sync" - "syscall" "time" "github.com/ClusterCockpit/cc-backend/internal/config" "github.com/ClusterCockpit/cc-backend/pkg/archive" cclog "github.com/ClusterCockpit/cc-lib/ccLogger" "github.com/ClusterCockpit/cc-lib/resampler" - "github.com/ClusterCockpit/cc-lib/runtimeEnv" "github.com/ClusterCockpit/cc-lib/schema" "github.com/ClusterCockpit/cc-lib/util" ) @@ -29,14 +39,12 @@ import ( var ( singleton sync.Once msInstance *MemoryStore + // shutdownFunc stores the context cancellation function created in Init + // and is called during Shutdown to cancel all background goroutines + shutdownFunc context.CancelFunc ) -var NumWorkers int = 4 -func init() { - maxWorkers := 10 - NumWorkers = min(runtime.NumCPU()/2+1, maxWorkers) -} type Metric struct { Name string @@ -61,30 +69,34 @@ func Init(rawConfig json.RawMessage, wg *sync.WaitGroup) { } } + // Set NumWorkers from config or use default + if Keys.NumWorkers <= 0 { + maxWorkers := 10 + Keys.NumWorkers = min(runtime.NumCPU()/2+1, maxWorkers) + } + cclog.Debugf("[METRICSTORE]> Using %d workers for checkpoint/archive operations\n", Keys.NumWorkers) + + // Helper function to add metric configuration + addMetricConfig := func(mc schema.MetricConfig) { + agg, err := AssignAggregationStrategy(mc.Aggregation) + if err != nil { + cclog.Warnf("Could not find aggregation strategy for metric config '%s': %s", mc.Name, err.Error()) + } + + AddMetric(mc.Name, MetricConfig{ + Frequency: int64(mc.Timestep), + Aggregation: agg, + }) + } + for _, c := range archive.Clusters { for _, mc := range c.MetricConfig { - agg, err := AssignAggregationStratergy(mc.Aggregation) - if err != nil { - cclog.Warnf("Could not find aggregation stratergy for metric config '%s': %s", mc.Name, err.Error()) - } - - AddMetric(mc.Name, MetricConfig{ - Frequency: int64(mc.Timestep), - Aggregation: agg, - }) + addMetricConfig(*mc) } for _, sc := range c.SubClusters { for _, mc := range sc.MetricConfig { - agg, err := AssignAggregationStratergy(mc.Aggregation) - if err != nil { - cclog.Warnf("Could not find aggregation stratergy for metric config '%s': %s", mc.Name, err.Error()) - } - - AddMetric(mc.Name, MetricConfig{ - Frequency: int64(mc.Timestep), - Aggregation: agg, - }) + addMetricConfig(mc) } } } @@ -126,15 +138,11 @@ func Init(rawConfig json.RawMessage, wg *sync.WaitGroup) { Archiving(wg, ctx) DataStaging(wg, ctx) - wg.Add(1) - sigs := make(chan os.Signal, 1) - signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) - go func() { - defer wg.Done() - <-sigs - runtimeEnv.SystemdNotifiy(false, "[METRICSTORE]> Shutting down ...") - shutdown() - }() + // Note: Signal handling has been removed from this function. + // The caller is responsible for handling shutdown signals and calling + // the shutdown() function when appropriate. + // Store the shutdown function for later use by Shutdown() + shutdownFunc = shutdown if Keys.Nats != nil { for _, natsConf := range Keys.Nats { @@ -190,6 +198,11 @@ func GetMemoryStore() *MemoryStore { } func Shutdown() { + // Cancel the context to signal all background goroutines to stop + if shutdownFunc != nil { + shutdownFunc() + } + cclog.Infof("[METRICSTORE]> Writing to '%s'...\n", Keys.Checkpoints.RootDir) var files int var err error @@ -207,70 +220,8 @@ func Shutdown() { cclog.Errorf("[METRICSTORE]> Writing checkpoint failed: %s\n", err.Error()) } cclog.Infof("[METRICSTORE]> Done! (%d files written)\n", files) - - // ms.PrintHeirarchy() } -// func (m *MemoryStore) PrintHeirarchy() { -// m.root.lock.Lock() -// defer m.root.lock.Unlock() - -// fmt.Printf("Root : \n") - -// for lvl1, sel1 := range m.root.children { -// fmt.Printf("\t%s\n", lvl1) -// for lvl2, sel2 := range sel1.children { -// fmt.Printf("\t\t%s\n", lvl2) -// if lvl1 == "fritz" && lvl2 == "f0201" { - -// for name, met := range m.Metrics { -// mt := sel2.metrics[met.Offset] - -// fmt.Printf("\t\t\t\t%s\n", name) -// fmt.Printf("\t\t\t\t") - -// for mt != nil { -// // if name == "cpu_load" { -// fmt.Printf("%d(%d) -> %#v", mt.start, len(mt.data), mt.data) -// // } -// mt = mt.prev -// } -// fmt.Printf("\n") - -// } -// } -// for lvl3, sel3 := range sel2.children { -// if lvl1 == "fritz" && lvl2 == "f0201" && lvl3 == "hwthread70" { - -// fmt.Printf("\t\t\t\t\t%s\n", lvl3) - -// for name, met := range m.Metrics { -// mt := sel3.metrics[met.Offset] - -// fmt.Printf("\t\t\t\t\t\t%s\n", name) - -// fmt.Printf("\t\t\t\t\t\t") - -// for mt != nil { -// // if name == "clock" { -// fmt.Printf("%d(%d) -> %#v", mt.start, len(mt.data), mt.data) - -// mt = mt.prev -// } -// fmt.Printf("\n") - -// } - -// // for i, _ := range sel3.metrics { -// // fmt.Printf("\t\t\t\t\t%s\n", getName(configmetrics, i)) -// // } -// } -// } -// } -// } - -// } - func getName(m *MemoryStore, i int) string { for key, val := range m.Metrics { if val.offset == i { diff --git a/internal/memorystore/memorystore_test.go b/internal/memorystore/memorystore_test.go new file mode 100644 index 0000000..b8ab090 --- /dev/null +++ b/internal/memorystore/memorystore_test.go @@ -0,0 +1,156 @@ +// Copyright (C) NHR@FAU, University Erlangen-Nuremberg. +// All rights reserved. This file is part of cc-backend. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package memorystore + +import ( + "testing" + + "github.com/ClusterCockpit/cc-lib/schema" +) + +func TestAssignAggregationStrategy(t *testing.T) { + tests := []struct { + name string + input string + expected AggregationStrategy + wantErr bool + }{ + {"empty string", "", NoAggregation, false}, + {"sum", "sum", SumAggregation, false}, + {"avg", "avg", AvgAggregation, false}, + {"invalid", "invalid", NoAggregation, true}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result, err := AssignAggregationStrategy(tt.input) + if (err != nil) != tt.wantErr { + t.Errorf("AssignAggregationStrategy(%q) error = %v, wantErr %v", tt.input, err, tt.wantErr) + return + } + if result != tt.expected { + t.Errorf("AssignAggregationStrategy(%q) = %v, want %v", tt.input, result, tt.expected) + } + }) + } +} + +func TestAddMetric(t *testing.T) { + // Reset Metrics before test + Metrics = make(map[string]MetricConfig) + + err := AddMetric("test_metric", MetricConfig{ + Frequency: 60, + Aggregation: SumAggregation, + }) + if err != nil { + t.Errorf("AddMetric() error = %v", err) + } + + if _, ok := Metrics["test_metric"]; !ok { + t.Error("AddMetric() did not add metric to Metrics map") + } + + // Test updating with higher frequency + err = AddMetric("test_metric", MetricConfig{ + Frequency: 120, + Aggregation: SumAggregation, + }) + if err != nil { + t.Errorf("AddMetric() error = %v", err) + } + + if Metrics["test_metric"].Frequency != 120 { + t.Errorf("AddMetric() frequency = %d, want 120", Metrics["test_metric"].Frequency) + } + + // Test updating with lower frequency (should not update) + err = AddMetric("test_metric", MetricConfig{ + Frequency: 30, + Aggregation: SumAggregation, + }) + if err != nil { + t.Errorf("AddMetric() error = %v", err) + } + + if Metrics["test_metric"].Frequency != 120 { + t.Errorf("AddMetric() frequency = %d, want 120 (should not downgrade)", Metrics["test_metric"].Frequency) + } +} + +func TestGetMetricFrequency(t *testing.T) { + // Reset Metrics before test + Metrics = map[string]MetricConfig{ + "test_metric": { + Frequency: 60, + Aggregation: SumAggregation, + }, + } + + freq, err := GetMetricFrequency("test_metric") + if err != nil { + t.Errorf("GetMetricFrequency() error = %v", err) + } + if freq != 60 { + t.Errorf("GetMetricFrequency() = %d, want 60", freq) + } + + _, err = GetMetricFrequency("nonexistent") + if err == nil { + t.Error("GetMetricFrequency() expected error for nonexistent metric") + } +} + +func TestBufferWrite(t *testing.T) { + b := newBuffer(100, 10) + + // Test writing value + nb, err := b.write(100, schema.Float(42.0)) + if err != nil { + t.Errorf("buffer.write() error = %v", err) + } + if nb != b { + t.Error("buffer.write() created new buffer unexpectedly") + } + if len(b.data) != 1 { + t.Errorf("buffer.write() len(data) = %d, want 1", len(b.data)) + } + if b.data[0] != schema.Float(42.0) { + t.Errorf("buffer.write() data[0] = %v, want 42.0", b.data[0]) + } + + // Test writing value from past (should error) + _, err = b.write(50, schema.Float(10.0)) + if err == nil { + t.Error("buffer.write() expected error for past timestamp") + } +} + +func TestBufferRead(t *testing.T) { + b := newBuffer(100, 10) + + // Write some test data + b.write(100, schema.Float(1.0)) + b.write(110, schema.Float(2.0)) + b.write(120, schema.Float(3.0)) + + // Read data + data := make([]schema.Float, 3) + result, from, to, err := b.read(100, 130, data) + if err != nil { + t.Errorf("buffer.read() error = %v", err) + } + // Buffer read should return from as firstWrite (start + freq/2) + if from != 100 { + t.Errorf("buffer.read() from = %d, want 100", from) + } + if to != 130 { + t.Errorf("buffer.read() to = %d, want 130", to) + } + if len(result) != 3 { + t.Errorf("buffer.read() len(result) = %d, want 3", len(result)) + } +}