Restructure configuration with sensible defaults. Fix shutdown hangs

This commit is contained in:
2026-01-15 11:33:01 +01:00
parent e1efc68476
commit d59aa2e855
7 changed files with 189 additions and 132 deletions

View File

@@ -24,7 +24,7 @@ import (
func Archiving(wg *sync.WaitGroup, ctx context.Context) {
go func() {
defer wg.Done()
d, err := time.ParseDuration(Keys.Archive.Interval)
d, err := time.ParseDuration(Keys.Archive.ArchiveInterval)
if err != nil {
cclog.Fatalf("[METRICSTORE]> error parsing archive interval duration: %v\n", err)
}

View File

@@ -30,8 +30,51 @@ func DataStaging(wg *sync.WaitGroup, ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case val := <-LineProtocolMessages:
// Drain any remaining messages in channel before exiting
for {
select {
case val, ok := <-LineProtocolMessages:
if !ok {
// Channel closed
return
}
// Process remaining message
freq, err := GetMetricFrequency(val.MetricName)
if err != nil {
continue
}
metricName := ""
for _, selectorName := range val.Selector {
metricName += selectorName + SelectorDelimiter
}
metricName += val.MetricName
var selector []string
selector = append(selector, val.Cluster, val.Node, strconv.FormatInt(freq, 10))
if !stringSlicesEqual(oldSelector, selector) {
avroLevel = avroStore.root.findAvroLevelOrCreate(selector)
if avroLevel == nil {
cclog.Errorf("Error creating or finding the level with cluster : %s, node : %s, metric : %s\n", val.Cluster, val.Node, val.MetricName)
}
oldSelector = slices.Clone(selector)
}
if avroLevel != nil {
avroLevel.addMetric(metricName, val.Value, val.Timestamp, int(freq))
}
default:
// No more messages, exit
return
}
}
case val, ok := <-LineProtocolMessages:
if !ok {
// Channel closed, exit gracefully
return
}
// Fetch the frequency of the metric from the global configuration
freq, err := GetMetricFrequency(val.MetricName)
if err != nil {
@@ -65,7 +108,9 @@ func DataStaging(wg *sync.WaitGroup, ctx context.Context) {
oldSelector = slices.Clone(selector)
}
avroLevel.addMetric(metricName, val.Value, val.Timestamp, int(freq))
if avroLevel != nil {
avroLevel.addMetric(metricName, val.Value, val.Timestamp, int(freq))
}
}
}
}()

View File

@@ -408,7 +408,7 @@ func (m *MemoryStore) FromCheckpointFiles(dir string, from int64) (int, error) {
return m.FromCheckpoint(dir, from, altFormat)
}
cclog.Print("[METRICSTORE]> No valid checkpoint files found in the directory")
cclog.Info("[METRICSTORE]> No valid checkpoint files found")
return 0, nil
}

View File

@@ -19,36 +19,49 @@ const (
DefaultAvroCheckpointInterval = time.Minute
)
type Checkpoints struct {
FileFormat string `json:"file-format"`
Interval string `json:"interval"`
RootDir string `json:"directory"`
}
type Debug struct {
DumpToFile string `json:"dump-to-file"`
EnableGops bool `json:"gops"`
}
type Archive struct {
ArchiveInterval string `json:"interval"`
RootDir string `json:"directory"`
DeleteInstead bool `json:"delete-instead"`
}
type Subscriptions []struct {
// Channel name
SubscribeTo string `json:"subscribe-to"`
// Allow lines without a cluster tag, use this as default, optional
ClusterTag string `json:"cluster-tag"`
}
type MetricStoreConfig struct {
// Number of concurrent workers for checkpoint and archive operations.
// If not set or 0, defaults to min(runtime.NumCPU()/2+1, 10)
NumWorkers int `json:"num-workers"`
Checkpoints struct {
FileFormat string `json:"file-format"`
Interval string `json:"interval"`
RootDir string `json:"directory"`
Restore string `json:"restore"`
} `json:"checkpoints"`
Debug struct {
DumpToFile string `json:"dump-to-file"`
EnableGops bool `json:"gops"`
} `json:"debug"`
RetentionInMemory string `json:"retention-in-memory"`
Archive struct {
Interval string `json:"interval"`
RootDir string `json:"directory"`
DeleteInstead bool `json:"delete-instead"`
} `json:"archive"`
Subscriptions []struct {
// Channel name
SubscribeTo string `json:"subscribe-to"`
// Allow lines without a cluster tag, use this as default, optional
ClusterTag string `json:"cluster-tag"`
} `json:"subscriptions"`
NumWorkers int `json:"num-workers"`
RetentionInMemory string `json:"retention-in-memory"`
MemoryCap int `json:"memory-cap"`
Checkpoints Checkpoints `json:"checkpoints"`
Debug *Debug `json:"debug"`
Archive *Archive `json:"archive"`
Subscriptions *Subscriptions `json:"nats-subscriptions"`
}
var Keys MetricStoreConfig
var Keys MetricStoreConfig = MetricStoreConfig{
Checkpoints: Checkpoints{
FileFormat: "avro",
RootDir: "./var/checkpoints",
},
}
// AggregationStrategy for aggregation over multiple values at different cpus/sockets/..., not time!
type AggregationStrategy int

View File

@@ -6,90 +6,72 @@
package metricstore
const configSchema = `{
"type": "object",
"description": "Configuration specific to built-in metric-store.",
"properties": {
"checkpoints": {
"description": "Configuration for checkpointing the metrics within metric-store",
"type": "object",
"properties": {
"file-format": {
"description": "Specify the type of checkpoint file. There are 2 variants: 'avro' and 'json'. If nothing is specified, 'avro' is default.",
"type": "string"
},
"interval": {
"description": "Interval at which the metrics should be checkpointed.",
"type": "string"
},
"directory": {
"description": "Specify the parent directy in which the checkpointed files should be placed.",
"type": "string"
},
"restore": {
"description": "When cc-backend starts up, look for checkpointed files that are less than X hours old and load metrics from these selected checkpoint files.",
"type": "string"
}
}
"type": "object",
"description": "Configuration specific to built-in metric-store.",
"properties": {
"num-workers": {
"description": "Number of concurrent workers for checkpoint and archive operations",
"type": "integer"
},
"checkpoints": {
"description": "Configuration for checkpointing the metrics within metric-store",
"type": "object",
"properties": {
"file-format": {
"description": "Specify the type of checkpoint file. There are 2 variants: 'avro' and 'json'. If nothing is specified, 'avro' is default.",
"type": "string"
},
"archive": {
"description": "Configuration for archiving the already checkpointed files.",
"type": "object",
"properties": {
"interval": {
"description": "Interval at which the checkpointed files should be archived.",
"type": "string"
},
"directory": {
"description": "Specify the parent directy in which the archived files should be placed.",
"type": "string"
}
}
"interval": {
"description": "Interval at which the metrics should be checkpointed.",
"type": "string"
},
"retention-in-memory": {
"description": "Keep the metrics within memory for given time interval. Retention for X hours, then the metrics would be freed.",
"type": "string"
},
"nats": {
"description": "Configuration for accepting published data through NATS.",
"type": "array",
"items": {
"type": "object",
"properties": {
"address": {
"description": "Address of the NATS server.",
"type": "string"
},
"username": {
"description": "Optional: If configured with username/password method.",
"type": "string"
},
"password": {
"description": "Optional: If configured with username/password method.",
"type": "string"
},
"creds-file-path": {
"description": "Optional: If configured with Credential File method. Path to your NATS cred file.",
"type": "string"
},
"subscriptions": {
"description": "Array of various subscriptions. Allows to subscibe to different subjects and publishers.",
"type": "array",
"items": {
"type": "object",
"properties": {
"subscribe-to": {
"description": "Channel name",
"type": "string"
},
"cluster-tag": {
"description": "Optional: Allow lines without a cluster tag, use this as default",
"type": "string"
}
}
}
}
}
}
"directory": {
"description": "Specify the parent directy in which the checkpointed files should be placed.",
"type": "string"
}
},
"required": ["interval"]
},
"archive": {
"description": "Configuration for archiving the already checkpointed files.",
"type": "object",
"properties": {
"interval": {
"description": "Interval at which the checkpointed files should be archived.",
"type": "string"
},
"directory": {
"description": "Specify the directy in which the archived files should be placed.",
"type": "string"
}
},
"required": ["interval", "directory"]
},
"retention-in-memory": {
"description": "Keep the metrics within memory for given time interval. Retention for X hours, then the metrics would be freed.",
"type": "string"
},
"memory-cap": {
"description": "Upper memory capacity limit used by metricstore in GB",
"type": "integer"
},
"nats-subscriptions": {
"description": "Array of various subscriptions. Allows to subscibe to different subjects and publishers.",
"type": "array",
"items": {
"type": "object",
"properties": {
"subscribe-to": {
"description": "Channel name",
"type": "string"
},
"cluster-tag": {
"description": "Optional: Allow lines without a cluster tag, use this as default",
"type": "string"
}
}
}
}
},
"required": ["checkpoints", "retention-in-memory"]
}`

View File

@@ -29,29 +29,30 @@ func ReceiveNats(ms *MemoryStore,
}
var wg sync.WaitGroup
msgs := make(chan []byte, workers*2)
for _, sc := range Keys.Subscriptions {
for _, sc := range *Keys.Subscriptions {
clusterTag := sc.ClusterTag
if workers > 1 {
wg.Add(workers)
for range workers {
go func() {
defer wg.Done()
for m := range msgs {
dec := lineprotocol.NewDecoderWithBytes(m)
if err := DecodeLine(dec, ms, clusterTag); err != nil {
cclog.Errorf("error: %s", err.Error())
}
}
wg.Done()
}()
}
nc.Subscribe(sc.SubscribeTo, func(subject string, data []byte) {
msgs <- data
select {
case msgs <- data:
case <-ctx.Done():
}
})
} else {
nc.Subscribe(sc.SubscribeTo, func(subject string, data []byte) {
@@ -64,7 +65,11 @@ func ReceiveNats(ms *MemoryStore,
cclog.Infof("NATS subscription to '%s' established", sc.SubscribeTo)
}
close(msgs)
go func() {
<-ctx.Done()
close(msgs)
}()
wg.Wait()
return nil

View File

@@ -25,6 +25,7 @@ import (
"encoding/json"
"errors"
"runtime"
"slices"
"sync"
"time"
@@ -61,7 +62,7 @@ func Init(rawConfig json.RawMessage, wg *sync.WaitGroup) {
if rawConfig != nil {
config.Validate(configSchema, rawConfig)
dec := json.NewDecoder(bytes.NewReader(rawConfig))
// dec.DisallowUnknownFields()
dec.DisallowUnknownFields()
if err := dec.Decode(&Keys); err != nil {
cclog.Abortf("[METRICSTORE]> Metric Store Config Init: Could not decode config file '%s'.\nError: %s\n", rawConfig, err.Error())
}
@@ -103,7 +104,7 @@ func Init(rawConfig json.RawMessage, wg *sync.WaitGroup) {
ms := GetMemoryStore()
d, err := time.ParseDuration(Keys.Checkpoints.Restore)
d, err := time.ParseDuration(Keys.RetentionInMemory)
if err != nil {
cclog.Fatal(err)
}
@@ -128,11 +129,21 @@ func Init(rawConfig json.RawMessage, wg *sync.WaitGroup) {
ctx, shutdown := context.WithCancel(context.Background())
wg.Add(4)
retentionGoroutines := 1
checkpointingGoroutines := 1
dataStagingGoroutines := 1
archivingGoroutines := 0
if Keys.Archive != nil {
archivingGoroutines = 1
}
totalGoroutines := retentionGoroutines + checkpointingGoroutines + dataStagingGoroutines + archivingGoroutines
wg.Add(totalGoroutines)
Retention(wg, ctx)
Checkpointing(wg, ctx)
Archiving(wg, ctx)
if Keys.Archive != nil {
Archiving(wg, ctx)
}
DataStaging(wg, ctx)
// Note: Signal handling has been removed from this function.
@@ -141,9 +152,11 @@ func Init(rawConfig json.RawMessage, wg *sync.WaitGroup) {
// Store the shutdown function for later use by Shutdown()
shutdownFunc = shutdown
err = ReceiveNats(ms, 1, ctx)
if err != nil {
cclog.Fatal(err)
if Keys.Subscriptions != nil {
err = ReceiveNats(ms, 1, ctx)
if err != nil {
cclog.Fatal(err)
}
}
}
@@ -184,11 +197,14 @@ func GetMemoryStore() *MemoryStore {
}
func Shutdown() {
// Cancel the context to signal all background goroutines to stop
if shutdownFunc != nil {
shutdownFunc()
}
if Keys.Checkpoints.FileFormat != "json" {
close(LineProtocolMessages)
}
cclog.Infof("[METRICSTORE]> Writing to '%s'...\n", Keys.Checkpoints.RootDir)
var files int
var err error
@@ -199,7 +215,6 @@ func Shutdown() {
files, err = ms.ToCheckpoint(Keys.Checkpoints.RootDir, lastCheckpoint.Unix(), time.Now().Unix())
} else {
files, err = GetAvroStore().ToCheckpoint(Keys.Checkpoints.RootDir, true)
close(LineProtocolMessages)
}
if err != nil {
@@ -314,11 +329,8 @@ func GetSelectors(ms *MemoryStore, excludeSelectors map[string][]string) [][]str
// Check if the key exists in our exclusion map
if excludedValues, exists := excludeSelectors[key]; exists {
// The key exists, now check if the specific value is in the exclusion list
for _, ev := range excludedValues {
if ev == value {
exclude = true
break
}
if slices.Contains(excludedValues, value) {
exclude = true
}
}