diff --git a/internal/metricstore/archive.go b/internal/metricstore/archive.go index 972769fd..78efadfe 100644 --- a/internal/metricstore/archive.go +++ b/internal/metricstore/archive.go @@ -24,7 +24,7 @@ import ( func Archiving(wg *sync.WaitGroup, ctx context.Context) { go func() { defer wg.Done() - d, err := time.ParseDuration(Keys.Archive.Interval) + d, err := time.ParseDuration(Keys.Archive.ArchiveInterval) if err != nil { cclog.Fatalf("[METRICSTORE]> error parsing archive interval duration: %v\n", err) } diff --git a/internal/metricstore/avroHelper.go b/internal/metricstore/avroHelper.go index 5587a58d..985fdd78 100644 --- a/internal/metricstore/avroHelper.go +++ b/internal/metricstore/avroHelper.go @@ -30,8 +30,51 @@ func DataStaging(wg *sync.WaitGroup, ctx context.Context) { for { select { case <-ctx.Done(): - return - case val := <-LineProtocolMessages: + // Drain any remaining messages in channel before exiting + for { + select { + case val, ok := <-LineProtocolMessages: + if !ok { + // Channel closed + return + } + // Process remaining message + freq, err := GetMetricFrequency(val.MetricName) + if err != nil { + continue + } + + metricName := "" + for _, selectorName := range val.Selector { + metricName += selectorName + SelectorDelimiter + } + metricName += val.MetricName + + var selector []string + selector = append(selector, val.Cluster, val.Node, strconv.FormatInt(freq, 10)) + + if !stringSlicesEqual(oldSelector, selector) { + avroLevel = avroStore.root.findAvroLevelOrCreate(selector) + if avroLevel == nil { + cclog.Errorf("Error creating or finding the level with cluster : %s, node : %s, metric : %s\n", val.Cluster, val.Node, val.MetricName) + } + oldSelector = slices.Clone(selector) + } + + if avroLevel != nil { + avroLevel.addMetric(metricName, val.Value, val.Timestamp, int(freq)) + } + default: + // No more messages, exit + return + } + } + case val, ok := <-LineProtocolMessages: + if !ok { + // Channel closed, exit gracefully + return + } + // Fetch the frequency of the metric from the global configuration freq, err := GetMetricFrequency(val.MetricName) if err != nil { @@ -65,7 +108,9 @@ func DataStaging(wg *sync.WaitGroup, ctx context.Context) { oldSelector = slices.Clone(selector) } - avroLevel.addMetric(metricName, val.Value, val.Timestamp, int(freq)) + if avroLevel != nil { + avroLevel.addMetric(metricName, val.Value, val.Timestamp, int(freq)) + } } } }() diff --git a/internal/metricstore/checkpoint.go b/internal/metricstore/checkpoint.go index 27d611c4..fb72fc6e 100644 --- a/internal/metricstore/checkpoint.go +++ b/internal/metricstore/checkpoint.go @@ -408,7 +408,7 @@ func (m *MemoryStore) FromCheckpointFiles(dir string, from int64) (int, error) { return m.FromCheckpoint(dir, from, altFormat) } - cclog.Print("[METRICSTORE]> No valid checkpoint files found in the directory") + cclog.Info("[METRICSTORE]> No valid checkpoint files found") return 0, nil } diff --git a/internal/metricstore/config.go b/internal/metricstore/config.go index c789f11c..9657453d 100644 --- a/internal/metricstore/config.go +++ b/internal/metricstore/config.go @@ -19,36 +19,49 @@ const ( DefaultAvroCheckpointInterval = time.Minute ) +type Checkpoints struct { + FileFormat string `json:"file-format"` + Interval string `json:"interval"` + RootDir string `json:"directory"` +} + +type Debug struct { + DumpToFile string `json:"dump-to-file"` + EnableGops bool `json:"gops"` +} + +type Archive struct { + ArchiveInterval string `json:"interval"` + RootDir string `json:"directory"` + DeleteInstead bool `json:"delete-instead"` +} + +type Subscriptions []struct { + // Channel name + SubscribeTo string `json:"subscribe-to"` + + // Allow lines without a cluster tag, use this as default, optional + ClusterTag string `json:"cluster-tag"` +} + type MetricStoreConfig struct { // Number of concurrent workers for checkpoint and archive operations. // If not set or 0, defaults to min(runtime.NumCPU()/2+1, 10) - NumWorkers int `json:"num-workers"` - Checkpoints struct { - FileFormat string `json:"file-format"` - Interval string `json:"interval"` - RootDir string `json:"directory"` - Restore string `json:"restore"` - } `json:"checkpoints"` - Debug struct { - DumpToFile string `json:"dump-to-file"` - EnableGops bool `json:"gops"` - } `json:"debug"` - RetentionInMemory string `json:"retention-in-memory"` - Archive struct { - Interval string `json:"interval"` - RootDir string `json:"directory"` - DeleteInstead bool `json:"delete-instead"` - } `json:"archive"` - Subscriptions []struct { - // Channel name - SubscribeTo string `json:"subscribe-to"` - - // Allow lines without a cluster tag, use this as default, optional - ClusterTag string `json:"cluster-tag"` - } `json:"subscriptions"` + NumWorkers int `json:"num-workers"` + RetentionInMemory string `json:"retention-in-memory"` + MemoryCap int `json:"memory-cap"` + Checkpoints Checkpoints `json:"checkpoints"` + Debug *Debug `json:"debug"` + Archive *Archive `json:"archive"` + Subscriptions *Subscriptions `json:"nats-subscriptions"` } -var Keys MetricStoreConfig +var Keys MetricStoreConfig = MetricStoreConfig{ + Checkpoints: Checkpoints{ + FileFormat: "avro", + RootDir: "./var/checkpoints", + }, +} // AggregationStrategy for aggregation over multiple values at different cpus/sockets/..., not time! type AggregationStrategy int diff --git a/internal/metricstore/configSchema.go b/internal/metricstore/configSchema.go index f1a20a73..c06bc6d9 100644 --- a/internal/metricstore/configSchema.go +++ b/internal/metricstore/configSchema.go @@ -6,90 +6,72 @@ package metricstore const configSchema = `{ - "type": "object", - "description": "Configuration specific to built-in metric-store.", - "properties": { - "checkpoints": { - "description": "Configuration for checkpointing the metrics within metric-store", - "type": "object", - "properties": { - "file-format": { - "description": "Specify the type of checkpoint file. There are 2 variants: 'avro' and 'json'. If nothing is specified, 'avro' is default.", - "type": "string" - }, - "interval": { - "description": "Interval at which the metrics should be checkpointed.", - "type": "string" - }, - "directory": { - "description": "Specify the parent directy in which the checkpointed files should be placed.", - "type": "string" - }, - "restore": { - "description": "When cc-backend starts up, look for checkpointed files that are less than X hours old and load metrics from these selected checkpoint files.", - "type": "string" - } - } + "type": "object", + "description": "Configuration specific to built-in metric-store.", + "properties": { + "num-workers": { + "description": "Number of concurrent workers for checkpoint and archive operations", + "type": "integer" + }, + "checkpoints": { + "description": "Configuration for checkpointing the metrics within metric-store", + "type": "object", + "properties": { + "file-format": { + "description": "Specify the type of checkpoint file. There are 2 variants: 'avro' and 'json'. If nothing is specified, 'avro' is default.", + "type": "string" }, - "archive": { - "description": "Configuration for archiving the already checkpointed files.", - "type": "object", - "properties": { - "interval": { - "description": "Interval at which the checkpointed files should be archived.", - "type": "string" - }, - "directory": { - "description": "Specify the parent directy in which the archived files should be placed.", - "type": "string" - } - } + "interval": { + "description": "Interval at which the metrics should be checkpointed.", + "type": "string" }, - "retention-in-memory": { - "description": "Keep the metrics within memory for given time interval. Retention for X hours, then the metrics would be freed.", - "type": "string" - }, - "nats": { - "description": "Configuration for accepting published data through NATS.", - "type": "array", - "items": { - "type": "object", - "properties": { - "address": { - "description": "Address of the NATS server.", - "type": "string" - }, - "username": { - "description": "Optional: If configured with username/password method.", - "type": "string" - }, - "password": { - "description": "Optional: If configured with username/password method.", - "type": "string" - }, - "creds-file-path": { - "description": "Optional: If configured with Credential File method. Path to your NATS cred file.", - "type": "string" - }, - "subscriptions": { - "description": "Array of various subscriptions. Allows to subscibe to different subjects and publishers.", - "type": "array", - "items": { - "type": "object", - "properties": { - "subscribe-to": { - "description": "Channel name", - "type": "string" - }, - "cluster-tag": { - "description": "Optional: Allow lines without a cluster tag, use this as default", - "type": "string" - } - } - } - } - } - } + "directory": { + "description": "Specify the parent directy in which the checkpointed files should be placed.", + "type": "string" } + }, + "required": ["interval"] + }, + "archive": { + "description": "Configuration for archiving the already checkpointed files.", + "type": "object", + "properties": { + "interval": { + "description": "Interval at which the checkpointed files should be archived.", + "type": "string" + }, + "directory": { + "description": "Specify the directy in which the archived files should be placed.", + "type": "string" + } + }, + "required": ["interval", "directory"] + }, + "retention-in-memory": { + "description": "Keep the metrics within memory for given time interval. Retention for X hours, then the metrics would be freed.", + "type": "string" + }, + "memory-cap": { + "description": "Upper memory capacity limit used by metricstore in GB", + "type": "integer" + }, + "nats-subscriptions": { + "description": "Array of various subscriptions. Allows to subscibe to different subjects and publishers.", + "type": "array", + "items": { + "type": "object", + "properties": { + "subscribe-to": { + "description": "Channel name", + "type": "string" + }, + "cluster-tag": { + "description": "Optional: Allow lines without a cluster tag, use this as default", + "type": "string" + } + } + } } + }, + "required": ["checkpoints", "retention-in-memory"] }` diff --git a/internal/metricstore/lineprotocol.go b/internal/metricstore/lineprotocol.go index cc59e213..ca8ea138 100644 --- a/internal/metricstore/lineprotocol.go +++ b/internal/metricstore/lineprotocol.go @@ -29,29 +29,30 @@ func ReceiveNats(ms *MemoryStore, } var wg sync.WaitGroup - msgs := make(chan []byte, workers*2) - for _, sc := range Keys.Subscriptions { + for _, sc := range *Keys.Subscriptions { clusterTag := sc.ClusterTag if workers > 1 { wg.Add(workers) for range workers { go func() { + defer wg.Done() for m := range msgs { dec := lineprotocol.NewDecoderWithBytes(m) if err := DecodeLine(dec, ms, clusterTag); err != nil { cclog.Errorf("error: %s", err.Error()) } } - - wg.Done() }() } nc.Subscribe(sc.SubscribeTo, func(subject string, data []byte) { - msgs <- data + select { + case msgs <- data: + case <-ctx.Done(): + } }) } else { nc.Subscribe(sc.SubscribeTo, func(subject string, data []byte) { @@ -64,7 +65,11 @@ func ReceiveNats(ms *MemoryStore, cclog.Infof("NATS subscription to '%s' established", sc.SubscribeTo) } - close(msgs) + go func() { + <-ctx.Done() + close(msgs) + }() + wg.Wait() return nil diff --git a/internal/metricstore/metricstore.go b/internal/metricstore/metricstore.go index d75c9ef8..7c17cd88 100644 --- a/internal/metricstore/metricstore.go +++ b/internal/metricstore/metricstore.go @@ -25,6 +25,7 @@ import ( "encoding/json" "errors" "runtime" + "slices" "sync" "time" @@ -61,7 +62,7 @@ func Init(rawConfig json.RawMessage, wg *sync.WaitGroup) { if rawConfig != nil { config.Validate(configSchema, rawConfig) dec := json.NewDecoder(bytes.NewReader(rawConfig)) - // dec.DisallowUnknownFields() + dec.DisallowUnknownFields() if err := dec.Decode(&Keys); err != nil { cclog.Abortf("[METRICSTORE]> Metric Store Config Init: Could not decode config file '%s'.\nError: %s\n", rawConfig, err.Error()) } @@ -103,7 +104,7 @@ func Init(rawConfig json.RawMessage, wg *sync.WaitGroup) { ms := GetMemoryStore() - d, err := time.ParseDuration(Keys.Checkpoints.Restore) + d, err := time.ParseDuration(Keys.RetentionInMemory) if err != nil { cclog.Fatal(err) } @@ -128,11 +129,21 @@ func Init(rawConfig json.RawMessage, wg *sync.WaitGroup) { ctx, shutdown := context.WithCancel(context.Background()) - wg.Add(4) + retentionGoroutines := 1 + checkpointingGoroutines := 1 + dataStagingGoroutines := 1 + archivingGoroutines := 0 + if Keys.Archive != nil { + archivingGoroutines = 1 + } + totalGoroutines := retentionGoroutines + checkpointingGoroutines + dataStagingGoroutines + archivingGoroutines + wg.Add(totalGoroutines) Retention(wg, ctx) Checkpointing(wg, ctx) - Archiving(wg, ctx) + if Keys.Archive != nil { + Archiving(wg, ctx) + } DataStaging(wg, ctx) // Note: Signal handling has been removed from this function. @@ -141,9 +152,11 @@ func Init(rawConfig json.RawMessage, wg *sync.WaitGroup) { // Store the shutdown function for later use by Shutdown() shutdownFunc = shutdown - err = ReceiveNats(ms, 1, ctx) - if err != nil { - cclog.Fatal(err) + if Keys.Subscriptions != nil { + err = ReceiveNats(ms, 1, ctx) + if err != nil { + cclog.Fatal(err) + } } } @@ -184,11 +197,14 @@ func GetMemoryStore() *MemoryStore { } func Shutdown() { - // Cancel the context to signal all background goroutines to stop if shutdownFunc != nil { shutdownFunc() } + if Keys.Checkpoints.FileFormat != "json" { + close(LineProtocolMessages) + } + cclog.Infof("[METRICSTORE]> Writing to '%s'...\n", Keys.Checkpoints.RootDir) var files int var err error @@ -199,7 +215,6 @@ func Shutdown() { files, err = ms.ToCheckpoint(Keys.Checkpoints.RootDir, lastCheckpoint.Unix(), time.Now().Unix()) } else { files, err = GetAvroStore().ToCheckpoint(Keys.Checkpoints.RootDir, true) - close(LineProtocolMessages) } if err != nil { @@ -314,11 +329,8 @@ func GetSelectors(ms *MemoryStore, excludeSelectors map[string][]string) [][]str // Check if the key exists in our exclusion map if excludedValues, exists := excludeSelectors[key]; exists { // The key exists, now check if the specific value is in the exclusion list - for _, ev := range excludedValues { - if ev == value { - exclude = true - break - } + if slices.Contains(excludedValues, value) { + exclude = true } }