mirror of
https://github.com/ClusterCockpit/cc-backend
synced 2026-01-16 09:41:47 +01:00
Merge branch 'dev' of https://github.com/ClusterCockpit/cc-backend into dev
This commit is contained in:
@@ -271,6 +271,7 @@ func initSubsystems() error {
|
|||||||
// Initialize job archive
|
// Initialize job archive
|
||||||
archiveCfg := ccconf.GetPackageConfig("archive")
|
archiveCfg := ccconf.GetPackageConfig("archive")
|
||||||
if archiveCfg == nil {
|
if archiveCfg == nil {
|
||||||
|
cclog.Debug("Archive configuration not found, using default archive configuration")
|
||||||
archiveCfg = json.RawMessage(defaultArchiveConfig)
|
archiveCfg = json.RawMessage(defaultArchiveConfig)
|
||||||
}
|
}
|
||||||
if err := archive.Init(archiveCfg, config.Keys.DisableArchive); err != nil {
|
if err := archive.Init(archiveCfg, config.Keys.DisableArchive); err != nil {
|
||||||
@@ -319,8 +320,13 @@ func runServer(ctx context.Context) error {
|
|||||||
mscfg := ccconf.GetPackageConfig("metric-store")
|
mscfg := ccconf.GetPackageConfig("metric-store")
|
||||||
if mscfg != nil {
|
if mscfg != nil {
|
||||||
metricstore.Init(mscfg, &wg)
|
metricstore.Init(mscfg, &wg)
|
||||||
|
|
||||||
|
// Inject repository as NodeProvider to break import cycle
|
||||||
|
ms := metricstore.GetMemoryStore()
|
||||||
|
jobRepo := repository.GetJobRepository()
|
||||||
|
ms.SetNodeProvider(jobRepo)
|
||||||
} else {
|
} else {
|
||||||
cclog.Debug("Metric store configuration not found, skipping metricstore initialization")
|
return fmt.Errorf("missing metricstore configuration")
|
||||||
}
|
}
|
||||||
|
|
||||||
// Start archiver and task manager
|
// Start archiver and task manager
|
||||||
@@ -375,22 +381,37 @@ func runServer(ctx context.Context) error {
|
|||||||
}
|
}
|
||||||
runtime.SystemdNotify(true, "running")
|
runtime.SystemdNotify(true, "running")
|
||||||
|
|
||||||
// Wait for completion or error
|
waitDone := make(chan struct{})
|
||||||
go func() {
|
go func() {
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
|
close(waitDone)
|
||||||
|
}()
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
<-waitDone
|
||||||
close(errChan)
|
close(errChan)
|
||||||
}()
|
}()
|
||||||
|
|
||||||
// Check for server startup errors
|
// Wait for either:
|
||||||
|
// 1. An error from server startup
|
||||||
|
// 2. Completion of all goroutines (normal shutdown or crash)
|
||||||
select {
|
select {
|
||||||
case err := <-errChan:
|
case err := <-errChan:
|
||||||
|
// errChan will be closed when waitDone is closed, which happens
|
||||||
|
// when all goroutines complete (either from normal shutdown or error)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
case <-time.After(100 * time.Millisecond):
|
case <-time.After(100 * time.Millisecond):
|
||||||
// Server started successfully, wait for completion
|
// Give the server 100ms to start and report any immediate startup errors
|
||||||
if err := <-errChan; err != nil {
|
// After that, just wait for normal shutdown completion
|
||||||
return err
|
select {
|
||||||
|
case err := <-errChan:
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
case <-waitDone:
|
||||||
|
// Normal shutdown completed
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -1,55 +1,22 @@
|
|||||||
{
|
{
|
||||||
"main": {
|
"main": {
|
||||||
"addr": "127.0.0.1:8080",
|
"addr": "127.0.0.1:8080",
|
||||||
"short-running-jobs-duration": 300,
|
"apiAllowedIPs": ["*"]
|
||||||
"resampling": {
|
|
||||||
"minimumPoints": 600,
|
|
||||||
"trigger": 300,
|
|
||||||
"resolutions": [240, 60]
|
|
||||||
},
|
|
||||||
"apiAllowedIPs": ["*"],
|
|
||||||
"emission-constant": 317
|
|
||||||
},
|
},
|
||||||
"cron": {
|
"cron": {
|
||||||
"commit-job-worker": "2m",
|
"commit-job-worker": "1m",
|
||||||
"duration-worker": "5m",
|
"duration-worker": "3m",
|
||||||
"footprint-worker": "10m"
|
"footprint-worker": "5m"
|
||||||
},
|
|
||||||
"archive": {
|
|
||||||
"kind": "file",
|
|
||||||
"path": "./var/job-archive"
|
|
||||||
},
|
},
|
||||||
"auth": {
|
"auth": {
|
||||||
"jwts": {
|
"jwts": {
|
||||||
"max-age": "2000h"
|
"max-age": "2000h"
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
"nats": {
|
|
||||||
"address": "nats://0.0.0.0:4222",
|
|
||||||
"username": "root",
|
|
||||||
"password": "root"
|
|
||||||
},
|
|
||||||
"metric-store": {
|
"metric-store": {
|
||||||
"checkpoints": {
|
"checkpoints": {
|
||||||
"file-format": "avro",
|
"interval": "12h"
|
||||||
"interval": "1h",
|
|
||||||
"directory": "./var/checkpoints",
|
|
||||||
"restore": "48h"
|
|
||||||
},
|
},
|
||||||
"archive": {
|
"retention-in-memory": "48h"
|
||||||
"interval": "1h",
|
|
||||||
"directory": "./var/archive"
|
|
||||||
},
|
|
||||||
"retention-in-memory": "48h",
|
|
||||||
"subscriptions": [
|
|
||||||
{
|
|
||||||
"subscribe-to": "hpc-nats",
|
|
||||||
"cluster-tag": "fritz"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"subscribe-to": "hpc-nats",
|
|
||||||
"cluster-tag": "alex"
|
|
||||||
}
|
|
||||||
]
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -5,9 +5,9 @@
|
|||||||
"https-key-file": "/etc/letsencrypt/live/url/privkey.pem",
|
"https-key-file": "/etc/letsencrypt/live/url/privkey.pem",
|
||||||
"user": "clustercockpit",
|
"user": "clustercockpit",
|
||||||
"group": "clustercockpit",
|
"group": "clustercockpit",
|
||||||
"validate": false,
|
|
||||||
"apiAllowedIPs": ["*"],
|
"apiAllowedIPs": ["*"],
|
||||||
"short-running-jobs-duration": 300,
|
"short-running-jobs-duration": 300,
|
||||||
|
"enable-job-taggers": true,
|
||||||
"resampling": {
|
"resampling": {
|
||||||
"minimumPoints": 600,
|
"minimumPoints": 600,
|
||||||
"trigger": 180,
|
"trigger": 180,
|
||||||
@@ -18,13 +18,48 @@
|
|||||||
"subjectNodeState": "cc.node.state"
|
"subjectNodeState": "cc.node.state"
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
"nats": {
|
||||||
|
"address": "nats://0.0.0.0:4222",
|
||||||
|
"username": "root",
|
||||||
|
"password": "root"
|
||||||
|
},
|
||||||
|
"auth": {
|
||||||
|
"jwts": {
|
||||||
|
"max-age": "2000h"
|
||||||
|
}
|
||||||
|
},
|
||||||
"cron": {
|
"cron": {
|
||||||
"commit-job-worker": "1m",
|
"commit-job-worker": "1m",
|
||||||
"duration-worker": "5m",
|
"duration-worker": "5m",
|
||||||
"footprint-worker": "10m"
|
"footprint-worker": "10m"
|
||||||
},
|
},
|
||||||
"archive": {
|
"archive": {
|
||||||
"kind": "file",
|
"kind": "s3",
|
||||||
"path": "./var/job-archive"
|
"endpoint": "http://x.x.x.x",
|
||||||
}
|
"bucket": "jobarchive",
|
||||||
|
"accessKey": "xx",
|
||||||
|
"secretKey": "xx",
|
||||||
|
"retention": {
|
||||||
|
"policy": "move",
|
||||||
|
"age": 365,
|
||||||
|
"location": "./var/archive"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"metric-store": {
|
||||||
|
"checkpoints": {
|
||||||
|
"interval": "12h"
|
||||||
|
},
|
||||||
|
"retention-in-memory": "48h",
|
||||||
|
"nats-subscriptions": [
|
||||||
|
{
|
||||||
|
"subscribe-to": "hpc-nats",
|
||||||
|
"cluster-tag": "fritz"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"subscribe-to": "hpc-nats",
|
||||||
|
"cluster-tag": "alex"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
},
|
||||||
|
"ui-file": "ui-config.json"
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -616,9 +616,9 @@ func securedCheck(user *schema.User, r *http.Request) error {
|
|||||||
}
|
}
|
||||||
// If SplitHostPort fails, IPAddress is already just a host (no port)
|
// If SplitHostPort fails, IPAddress is already just a host (no port)
|
||||||
|
|
||||||
// If nothing declared in config: deny all request to this api endpoint
|
// If nothing declared in config: Continue
|
||||||
if len(config.Keys.APIAllowedIPs) == 0 {
|
if len(config.Keys.APIAllowedIPs) == 0 {
|
||||||
return fmt.Errorf("missing configuration key ApiAllowedIPs")
|
return nil
|
||||||
}
|
}
|
||||||
// If wildcard declared in config: Continue
|
// If wildcard declared in config: Continue
|
||||||
if config.Keys.APIAllowedIPs[0] == "*" {
|
if config.Keys.APIAllowedIPs[0] == "*" {
|
||||||
|
|||||||
@@ -6,7 +6,7 @@
|
|||||||
package config
|
package config
|
||||||
|
|
||||||
var configSchema = `
|
var configSchema = `
|
||||||
{
|
{
|
||||||
"type": "object",
|
"type": "object",
|
||||||
"properties": {
|
"properties": {
|
||||||
"addr": {
|
"addr": {
|
||||||
@@ -135,6 +135,5 @@ var configSchema = `
|
|||||||
},
|
},
|
||||||
"required": ["subjectJobEvent", "subjectNodeState"]
|
"required": ["subjectJobEvent", "subjectNodeState"]
|
||||||
}
|
}
|
||||||
},
|
}
|
||||||
"required": ["apiAllowedIPs"]
|
}`
|
||||||
}`
|
|
||||||
|
|||||||
@@ -24,7 +24,7 @@ import (
|
|||||||
func Archiving(wg *sync.WaitGroup, ctx context.Context) {
|
func Archiving(wg *sync.WaitGroup, ctx context.Context) {
|
||||||
go func() {
|
go func() {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
d, err := time.ParseDuration(Keys.Archive.Interval)
|
d, err := time.ParseDuration(Keys.Archive.ArchiveInterval)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
cclog.Fatalf("[METRICSTORE]> error parsing archive interval duration: %v\n", err)
|
cclog.Fatalf("[METRICSTORE]> error parsing archive interval duration: %v\n", err)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -30,8 +30,51 @@ func DataStaging(wg *sync.WaitGroup, ctx context.Context) {
|
|||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return
|
// Drain any remaining messages in channel before exiting
|
||||||
case val := <-LineProtocolMessages:
|
for {
|
||||||
|
select {
|
||||||
|
case val, ok := <-LineProtocolMessages:
|
||||||
|
if !ok {
|
||||||
|
// Channel closed
|
||||||
|
return
|
||||||
|
}
|
||||||
|
// Process remaining message
|
||||||
|
freq, err := GetMetricFrequency(val.MetricName)
|
||||||
|
if err != nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
metricName := ""
|
||||||
|
for _, selectorName := range val.Selector {
|
||||||
|
metricName += selectorName + SelectorDelimiter
|
||||||
|
}
|
||||||
|
metricName += val.MetricName
|
||||||
|
|
||||||
|
var selector []string
|
||||||
|
selector = append(selector, val.Cluster, val.Node, strconv.FormatInt(freq, 10))
|
||||||
|
|
||||||
|
if !stringSlicesEqual(oldSelector, selector) {
|
||||||
|
avroLevel = avroStore.root.findAvroLevelOrCreate(selector)
|
||||||
|
if avroLevel == nil {
|
||||||
|
cclog.Errorf("Error creating or finding the level with cluster : %s, node : %s, metric : %s\n", val.Cluster, val.Node, val.MetricName)
|
||||||
|
}
|
||||||
|
oldSelector = slices.Clone(selector)
|
||||||
|
}
|
||||||
|
|
||||||
|
if avroLevel != nil {
|
||||||
|
avroLevel.addMetric(metricName, val.Value, val.Timestamp, int(freq))
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
// No more messages, exit
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
case val, ok := <-LineProtocolMessages:
|
||||||
|
if !ok {
|
||||||
|
// Channel closed, exit gracefully
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
// Fetch the frequency of the metric from the global configuration
|
// Fetch the frequency of the metric from the global configuration
|
||||||
freq, err := GetMetricFrequency(val.MetricName)
|
freq, err := GetMetricFrequency(val.MetricName)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -65,7 +108,9 @@ func DataStaging(wg *sync.WaitGroup, ctx context.Context) {
|
|||||||
oldSelector = slices.Clone(selector)
|
oldSelector = slices.Clone(selector)
|
||||||
}
|
}
|
||||||
|
|
||||||
avroLevel.addMetric(metricName, val.Value, val.Timestamp, int(freq))
|
if avroLevel != nil {
|
||||||
|
avroLevel.addMetric(metricName, val.Value, val.Timestamp, int(freq))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|||||||
@@ -408,7 +408,6 @@ func (m *MemoryStore) FromCheckpointFiles(dir string, from int64) (int, error) {
|
|||||||
return m.FromCheckpoint(dir, from, altFormat)
|
return m.FromCheckpoint(dir, from, altFormat)
|
||||||
}
|
}
|
||||||
|
|
||||||
cclog.Print("[METRICSTORE]> No valid checkpoint files found in the directory")
|
|
||||||
return 0, nil
|
return 0, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -19,36 +19,49 @@ const (
|
|||||||
DefaultAvroCheckpointInterval = time.Minute
|
DefaultAvroCheckpointInterval = time.Minute
|
||||||
)
|
)
|
||||||
|
|
||||||
|
type Checkpoints struct {
|
||||||
|
FileFormat string `json:"file-format"`
|
||||||
|
Interval string `json:"interval"`
|
||||||
|
RootDir string `json:"directory"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type Debug struct {
|
||||||
|
DumpToFile string `json:"dump-to-file"`
|
||||||
|
EnableGops bool `json:"gops"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type Archive struct {
|
||||||
|
ArchiveInterval string `json:"interval"`
|
||||||
|
RootDir string `json:"directory"`
|
||||||
|
DeleteInstead bool `json:"delete-instead"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type Subscriptions []struct {
|
||||||
|
// Channel name
|
||||||
|
SubscribeTo string `json:"subscribe-to"`
|
||||||
|
|
||||||
|
// Allow lines without a cluster tag, use this as default, optional
|
||||||
|
ClusterTag string `json:"cluster-tag"`
|
||||||
|
}
|
||||||
|
|
||||||
type MetricStoreConfig struct {
|
type MetricStoreConfig struct {
|
||||||
// Number of concurrent workers for checkpoint and archive operations.
|
// Number of concurrent workers for checkpoint and archive operations.
|
||||||
// If not set or 0, defaults to min(runtime.NumCPU()/2+1, 10)
|
// If not set or 0, defaults to min(runtime.NumCPU()/2+1, 10)
|
||||||
NumWorkers int `json:"num-workers"`
|
NumWorkers int `json:"num-workers"`
|
||||||
Checkpoints struct {
|
RetentionInMemory string `json:"retention-in-memory"`
|
||||||
FileFormat string `json:"file-format"`
|
MemoryCap int `json:"memory-cap"`
|
||||||
Interval string `json:"interval"`
|
Checkpoints Checkpoints `json:"checkpoints"`
|
||||||
RootDir string `json:"directory"`
|
Debug *Debug `json:"debug"`
|
||||||
Restore string `json:"restore"`
|
Archive *Archive `json:"archive"`
|
||||||
} `json:"checkpoints"`
|
Subscriptions *Subscriptions `json:"nats-subscriptions"`
|
||||||
Debug struct {
|
|
||||||
DumpToFile string `json:"dump-to-file"`
|
|
||||||
EnableGops bool `json:"gops"`
|
|
||||||
} `json:"debug"`
|
|
||||||
RetentionInMemory string `json:"retention-in-memory"`
|
|
||||||
Archive struct {
|
|
||||||
Interval string `json:"interval"`
|
|
||||||
RootDir string `json:"directory"`
|
|
||||||
DeleteInstead bool `json:"delete-instead"`
|
|
||||||
} `json:"archive"`
|
|
||||||
Subscriptions []struct {
|
|
||||||
// Channel name
|
|
||||||
SubscribeTo string `json:"subscribe-to"`
|
|
||||||
|
|
||||||
// Allow lines without a cluster tag, use this as default, optional
|
|
||||||
ClusterTag string `json:"cluster-tag"`
|
|
||||||
} `json:"subscriptions"`
|
|
||||||
}
|
}
|
||||||
|
|
||||||
var Keys MetricStoreConfig
|
var Keys MetricStoreConfig = MetricStoreConfig{
|
||||||
|
Checkpoints: Checkpoints{
|
||||||
|
FileFormat: "avro",
|
||||||
|
RootDir: "./var/checkpoints",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
// AggregationStrategy for aggregation over multiple values at different cpus/sockets/..., not time!
|
// AggregationStrategy for aggregation over multiple values at different cpus/sockets/..., not time!
|
||||||
type AggregationStrategy int
|
type AggregationStrategy int
|
||||||
|
|||||||
@@ -6,90 +6,72 @@
|
|||||||
package metricstore
|
package metricstore
|
||||||
|
|
||||||
const configSchema = `{
|
const configSchema = `{
|
||||||
"type": "object",
|
"type": "object",
|
||||||
"description": "Configuration specific to built-in metric-store.",
|
"description": "Configuration specific to built-in metric-store.",
|
||||||
"properties": {
|
"properties": {
|
||||||
"checkpoints": {
|
"num-workers": {
|
||||||
"description": "Configuration for checkpointing the metrics within metric-store",
|
"description": "Number of concurrent workers for checkpoint and archive operations",
|
||||||
"type": "object",
|
"type": "integer"
|
||||||
"properties": {
|
},
|
||||||
"file-format": {
|
"checkpoints": {
|
||||||
"description": "Specify the type of checkpoint file. There are 2 variants: 'avro' and 'json'. If nothing is specified, 'avro' is default.",
|
"description": "Configuration for checkpointing the metrics within metric-store",
|
||||||
"type": "string"
|
"type": "object",
|
||||||
},
|
"properties": {
|
||||||
"interval": {
|
"file-format": {
|
||||||
"description": "Interval at which the metrics should be checkpointed.",
|
"description": "Specify the type of checkpoint file. There are 2 variants: 'avro' and 'json'. If nothing is specified, 'avro' is default.",
|
||||||
"type": "string"
|
"type": "string"
|
||||||
},
|
|
||||||
"directory": {
|
|
||||||
"description": "Specify the parent directy in which the checkpointed files should be placed.",
|
|
||||||
"type": "string"
|
|
||||||
},
|
|
||||||
"restore": {
|
|
||||||
"description": "When cc-backend starts up, look for checkpointed files that are less than X hours old and load metrics from these selected checkpoint files.",
|
|
||||||
"type": "string"
|
|
||||||
}
|
|
||||||
}
|
|
||||||
},
|
},
|
||||||
"archive": {
|
"interval": {
|
||||||
"description": "Configuration for archiving the already checkpointed files.",
|
"description": "Interval at which the metrics should be checkpointed.",
|
||||||
"type": "object",
|
"type": "string"
|
||||||
"properties": {
|
|
||||||
"interval": {
|
|
||||||
"description": "Interval at which the checkpointed files should be archived.",
|
|
||||||
"type": "string"
|
|
||||||
},
|
|
||||||
"directory": {
|
|
||||||
"description": "Specify the parent directy in which the archived files should be placed.",
|
|
||||||
"type": "string"
|
|
||||||
}
|
|
||||||
}
|
|
||||||
},
|
},
|
||||||
"retention-in-memory": {
|
"directory": {
|
||||||
"description": "Keep the metrics within memory for given time interval. Retention for X hours, then the metrics would be freed.",
|
"description": "Specify the parent directy in which the checkpointed files should be placed.",
|
||||||
"type": "string"
|
"type": "string"
|
||||||
},
|
|
||||||
"nats": {
|
|
||||||
"description": "Configuration for accepting published data through NATS.",
|
|
||||||
"type": "array",
|
|
||||||
"items": {
|
|
||||||
"type": "object",
|
|
||||||
"properties": {
|
|
||||||
"address": {
|
|
||||||
"description": "Address of the NATS server.",
|
|
||||||
"type": "string"
|
|
||||||
},
|
|
||||||
"username": {
|
|
||||||
"description": "Optional: If configured with username/password method.",
|
|
||||||
"type": "string"
|
|
||||||
},
|
|
||||||
"password": {
|
|
||||||
"description": "Optional: If configured with username/password method.",
|
|
||||||
"type": "string"
|
|
||||||
},
|
|
||||||
"creds-file-path": {
|
|
||||||
"description": "Optional: If configured with Credential File method. Path to your NATS cred file.",
|
|
||||||
"type": "string"
|
|
||||||
},
|
|
||||||
"subscriptions": {
|
|
||||||
"description": "Array of various subscriptions. Allows to subscibe to different subjects and publishers.",
|
|
||||||
"type": "array",
|
|
||||||
"items": {
|
|
||||||
"type": "object",
|
|
||||||
"properties": {
|
|
||||||
"subscribe-to": {
|
|
||||||
"description": "Channel name",
|
|
||||||
"type": "string"
|
|
||||||
},
|
|
||||||
"cluster-tag": {
|
|
||||||
"description": "Optional: Allow lines without a cluster tag, use this as default",
|
|
||||||
"type": "string"
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
},
|
||||||
|
"required": ["interval"]
|
||||||
|
},
|
||||||
|
"archive": {
|
||||||
|
"description": "Configuration for archiving the already checkpointed files.",
|
||||||
|
"type": "object",
|
||||||
|
"properties": {
|
||||||
|
"interval": {
|
||||||
|
"description": "Interval at which the checkpointed files should be archived.",
|
||||||
|
"type": "string"
|
||||||
|
},
|
||||||
|
"directory": {
|
||||||
|
"description": "Specify the directy in which the archived files should be placed.",
|
||||||
|
"type": "string"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"required": ["interval", "directory"]
|
||||||
|
},
|
||||||
|
"retention-in-memory": {
|
||||||
|
"description": "Keep the metrics within memory for given time interval. Retention for X hours, then the metrics would be freed.",
|
||||||
|
"type": "string"
|
||||||
|
},
|
||||||
|
"memory-cap": {
|
||||||
|
"description": "Upper memory capacity limit used by metricstore in GB",
|
||||||
|
"type": "integer"
|
||||||
|
},
|
||||||
|
"nats-subscriptions": {
|
||||||
|
"description": "Array of various subscriptions. Allows to subscibe to different subjects and publishers.",
|
||||||
|
"type": "array",
|
||||||
|
"items": {
|
||||||
|
"type": "object",
|
||||||
|
"properties": {
|
||||||
|
"subscribe-to": {
|
||||||
|
"description": "Channel name",
|
||||||
|
"type": "string"
|
||||||
|
},
|
||||||
|
"cluster-tag": {
|
||||||
|
"description": "Optional: Allow lines without a cluster tag, use this as default",
|
||||||
|
"type": "string"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
},
|
||||||
|
"required": ["checkpoints", "retention-in-memory"]
|
||||||
}`
|
}`
|
||||||
|
|||||||
@@ -29,29 +29,30 @@ func ReceiveNats(ms *MemoryStore,
|
|||||||
}
|
}
|
||||||
|
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
|
|
||||||
msgs := make(chan []byte, workers*2)
|
msgs := make(chan []byte, workers*2)
|
||||||
|
|
||||||
for _, sc := range Keys.Subscriptions {
|
for _, sc := range *Keys.Subscriptions {
|
||||||
clusterTag := sc.ClusterTag
|
clusterTag := sc.ClusterTag
|
||||||
if workers > 1 {
|
if workers > 1 {
|
||||||
wg.Add(workers)
|
wg.Add(workers)
|
||||||
|
|
||||||
for range workers {
|
for range workers {
|
||||||
go func() {
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
for m := range msgs {
|
for m := range msgs {
|
||||||
dec := lineprotocol.NewDecoderWithBytes(m)
|
dec := lineprotocol.NewDecoderWithBytes(m)
|
||||||
if err := DecodeLine(dec, ms, clusterTag); err != nil {
|
if err := DecodeLine(dec, ms, clusterTag); err != nil {
|
||||||
cclog.Errorf("error: %s", err.Error())
|
cclog.Errorf("error: %s", err.Error())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
wg.Done()
|
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
nc.Subscribe(sc.SubscribeTo, func(subject string, data []byte) {
|
nc.Subscribe(sc.SubscribeTo, func(subject string, data []byte) {
|
||||||
msgs <- data
|
select {
|
||||||
|
case msgs <- data:
|
||||||
|
case <-ctx.Done():
|
||||||
|
}
|
||||||
})
|
})
|
||||||
} else {
|
} else {
|
||||||
nc.Subscribe(sc.SubscribeTo, func(subject string, data []byte) {
|
nc.Subscribe(sc.SubscribeTo, func(subject string, data []byte) {
|
||||||
@@ -64,7 +65,11 @@ func ReceiveNats(ms *MemoryStore,
|
|||||||
cclog.Infof("NATS subscription to '%s' established", sc.SubscribeTo)
|
cclog.Infof("NATS subscription to '%s' established", sc.SubscribeTo)
|
||||||
}
|
}
|
||||||
|
|
||||||
close(msgs)
|
go func() {
|
||||||
|
<-ctx.Done()
|
||||||
|
close(msgs)
|
||||||
|
}()
|
||||||
|
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
|||||||
@@ -25,6 +25,7 @@ import (
|
|||||||
"encoding/json"
|
"encoding/json"
|
||||||
"errors"
|
"errors"
|
||||||
"runtime"
|
"runtime"
|
||||||
|
"slices"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@@ -44,6 +45,15 @@ var (
|
|||||||
shutdownFunc context.CancelFunc
|
shutdownFunc context.CancelFunc
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// NodeProvider provides information about nodes currently in use by running jobs.
|
||||||
|
// This interface allows metricstore to query job information without directly
|
||||||
|
// depending on the repository package, breaking the import cycle.
|
||||||
|
type NodeProvider interface {
|
||||||
|
// GetUsedNodes returns a map of cluster names to sorted lists of unique hostnames
|
||||||
|
// that are currently in use by jobs that started before the given timestamp.
|
||||||
|
GetUsedNodes(ts int64) (map[string][]string, error)
|
||||||
|
}
|
||||||
|
|
||||||
type Metric struct {
|
type Metric struct {
|
||||||
Name string
|
Name string
|
||||||
Value schema.Float
|
Value schema.Float
|
||||||
@@ -51,8 +61,9 @@ type Metric struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type MemoryStore struct {
|
type MemoryStore struct {
|
||||||
Metrics map[string]MetricConfig
|
Metrics map[string]MetricConfig
|
||||||
root Level
|
root Level
|
||||||
|
nodeProvider NodeProvider // Injected dependency for querying running jobs
|
||||||
}
|
}
|
||||||
|
|
||||||
func Init(rawConfig json.RawMessage, wg *sync.WaitGroup) {
|
func Init(rawConfig json.RawMessage, wg *sync.WaitGroup) {
|
||||||
@@ -61,7 +72,7 @@ func Init(rawConfig json.RawMessage, wg *sync.WaitGroup) {
|
|||||||
if rawConfig != nil {
|
if rawConfig != nil {
|
||||||
config.Validate(configSchema, rawConfig)
|
config.Validate(configSchema, rawConfig)
|
||||||
dec := json.NewDecoder(bytes.NewReader(rawConfig))
|
dec := json.NewDecoder(bytes.NewReader(rawConfig))
|
||||||
// dec.DisallowUnknownFields()
|
dec.DisallowUnknownFields()
|
||||||
if err := dec.Decode(&Keys); err != nil {
|
if err := dec.Decode(&Keys); err != nil {
|
||||||
cclog.Abortf("[METRICSTORE]> Metric Store Config Init: Could not decode config file '%s'.\nError: %s\n", rawConfig, err.Error())
|
cclog.Abortf("[METRICSTORE]> Metric Store Config Init: Could not decode config file '%s'.\nError: %s\n", rawConfig, err.Error())
|
||||||
}
|
}
|
||||||
@@ -103,7 +114,7 @@ func Init(rawConfig json.RawMessage, wg *sync.WaitGroup) {
|
|||||||
|
|
||||||
ms := GetMemoryStore()
|
ms := GetMemoryStore()
|
||||||
|
|
||||||
d, err := time.ParseDuration(Keys.Checkpoints.Restore)
|
d, err := time.ParseDuration(Keys.RetentionInMemory)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
cclog.Fatal(err)
|
cclog.Fatal(err)
|
||||||
}
|
}
|
||||||
@@ -128,7 +139,13 @@ func Init(rawConfig json.RawMessage, wg *sync.WaitGroup) {
|
|||||||
|
|
||||||
ctx, shutdown := context.WithCancel(context.Background())
|
ctx, shutdown := context.WithCancel(context.Background())
|
||||||
|
|
||||||
wg.Add(4)
|
retentionGoroutines := 1
|
||||||
|
checkpointingGoroutines := 1
|
||||||
|
dataStagingGoroutines := 1
|
||||||
|
archivingGoroutines := 1
|
||||||
|
|
||||||
|
totalGoroutines := retentionGoroutines + checkpointingGoroutines + dataStagingGoroutines + archivingGoroutines
|
||||||
|
wg.Add(totalGoroutines)
|
||||||
|
|
||||||
Retention(wg, ctx)
|
Retention(wg, ctx)
|
||||||
Checkpointing(wg, ctx)
|
Checkpointing(wg, ctx)
|
||||||
@@ -141,9 +158,11 @@ func Init(rawConfig json.RawMessage, wg *sync.WaitGroup) {
|
|||||||
// Store the shutdown function for later use by Shutdown()
|
// Store the shutdown function for later use by Shutdown()
|
||||||
shutdownFunc = shutdown
|
shutdownFunc = shutdown
|
||||||
|
|
||||||
err = ReceiveNats(ms, 1, ctx)
|
if Keys.Subscriptions != nil {
|
||||||
if err != nil {
|
err = ReceiveNats(ms, 1, ctx)
|
||||||
cclog.Fatal(err)
|
if err != nil {
|
||||||
|
cclog.Fatal(err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -183,12 +202,23 @@ func GetMemoryStore() *MemoryStore {
|
|||||||
return msInstance
|
return msInstance
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SetNodeProvider sets the NodeProvider implementation for the MemoryStore.
|
||||||
|
// This must be called during initialization to provide job state information
|
||||||
|
// for selective buffer retention during Free operations.
|
||||||
|
// If not set, the Free function will fall back to freeing all buffers.
|
||||||
|
func (ms *MemoryStore) SetNodeProvider(provider NodeProvider) {
|
||||||
|
ms.nodeProvider = provider
|
||||||
|
}
|
||||||
|
|
||||||
func Shutdown() {
|
func Shutdown() {
|
||||||
// Cancel the context to signal all background goroutines to stop
|
|
||||||
if shutdownFunc != nil {
|
if shutdownFunc != nil {
|
||||||
shutdownFunc()
|
shutdownFunc()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if Keys.Checkpoints.FileFormat != "json" {
|
||||||
|
close(LineProtocolMessages)
|
||||||
|
}
|
||||||
|
|
||||||
cclog.Infof("[METRICSTORE]> Writing to '%s'...\n", Keys.Checkpoints.RootDir)
|
cclog.Infof("[METRICSTORE]> Writing to '%s'...\n", Keys.Checkpoints.RootDir)
|
||||||
var files int
|
var files int
|
||||||
var err error
|
var err error
|
||||||
@@ -199,7 +229,6 @@ func Shutdown() {
|
|||||||
files, err = ms.ToCheckpoint(Keys.Checkpoints.RootDir, lastCheckpoint.Unix(), time.Now().Unix())
|
files, err = ms.ToCheckpoint(Keys.Checkpoints.RootDir, lastCheckpoint.Unix(), time.Now().Unix())
|
||||||
} else {
|
} else {
|
||||||
files, err = GetAvroStore().ToCheckpoint(Keys.Checkpoints.RootDir, true)
|
files, err = GetAvroStore().ToCheckpoint(Keys.Checkpoints.RootDir, true)
|
||||||
close(LineProtocolMessages)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -248,18 +277,15 @@ func Retention(wg *sync.WaitGroup, ctx context.Context) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func Free(ms *MemoryStore, t time.Time) (int, error) {
|
func Free(ms *MemoryStore, t time.Time) (int, error) {
|
||||||
// jobRepo := repository.GetJobRepository()
|
// If no NodeProvider is configured, free all buffers older than t
|
||||||
// excludeSelectors, err := jobRepo.GetUsedNodes(t.Unix())
|
if ms.nodeProvider == nil {
|
||||||
// if err != nil {
|
return ms.Free(nil, t.Unix())
|
||||||
// return 0, err
|
}
|
||||||
// }
|
|
||||||
|
|
||||||
excludeSelectors := make(map[string][]string, 0)
|
excludeSelectors, err := ms.nodeProvider.GetUsedNodes(t.Unix())
|
||||||
|
if err != nil {
|
||||||
// excludeSelectors := map[string][]string{
|
return 0, err
|
||||||
// "alex": {"a0122", "a0123", "a0225"},
|
}
|
||||||
// "fritz": {"f0201", "f0202"},
|
|
||||||
// }
|
|
||||||
|
|
||||||
switch lenMap := len(excludeSelectors); lenMap {
|
switch lenMap := len(excludeSelectors); lenMap {
|
||||||
|
|
||||||
@@ -314,11 +340,8 @@ func GetSelectors(ms *MemoryStore, excludeSelectors map[string][]string) [][]str
|
|||||||
// Check if the key exists in our exclusion map
|
// Check if the key exists in our exclusion map
|
||||||
if excludedValues, exists := excludeSelectors[key]; exists {
|
if excludedValues, exists := excludeSelectors[key]; exists {
|
||||||
// The key exists, now check if the specific value is in the exclusion list
|
// The key exists, now check if the specific value is in the exclusion list
|
||||||
for _, ev := range excludedValues {
|
if slices.Contains(excludedValues, value) {
|
||||||
if ev == value {
|
exclude = true
|
||||||
exclude = true
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -327,9 +350,6 @@ func GetSelectors(ms *MemoryStore, excludeSelectors map[string][]string) [][]str
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// fmt.Printf("All selectors: %#v\n\n", allSelectors)
|
|
||||||
// fmt.Printf("filteredSelectors: %#v\n\n", filteredSelectors)
|
|
||||||
|
|
||||||
return filteredSelectors
|
return filteredSelectors
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -23,7 +23,8 @@ func RegisterLdapSyncService(ds string) {
|
|||||||
|
|
||||||
auth := auth.GetAuthInstance()
|
auth := auth.GetAuthInstance()
|
||||||
|
|
||||||
cclog.Info("Register LDAP sync service")
|
cclog.Infof("register ldap sync service with %s interval", ds)
|
||||||
|
|
||||||
s.NewJob(gocron.DurationJob(interval),
|
s.NewJob(gocron.DurationJob(interval),
|
||||||
gocron.NewTask(
|
gocron.NewTask(
|
||||||
func() {
|
func() {
|
||||||
@@ -32,6 +33,5 @@ func RegisterLdapSyncService(ds string) {
|
|||||||
if err := auth.LdapAuth.Sync(); err != nil {
|
if err := auth.LdapAuth.Sync(); err != nil {
|
||||||
cclog.Errorf("ldap sync failed: %s", err.Error())
|
cclog.Errorf("ldap sync failed: %s", err.Error())
|
||||||
}
|
}
|
||||||
cclog.Print("ldap sync done")
|
|
||||||
}))
|
}))
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -17,6 +17,10 @@ import (
|
|||||||
"github.com/go-co-op/gocron/v2"
|
"github.com/go-co-op/gocron/v2"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
DefaultCompressOlderThan = 7
|
||||||
|
)
|
||||||
|
|
||||||
// Retention defines the configuration for job retention policies.
|
// Retention defines the configuration for job retention policies.
|
||||||
type Retention struct {
|
type Retention struct {
|
||||||
Policy string `json:"policy"`
|
Policy string `json:"policy"`
|
||||||
@@ -60,6 +64,38 @@ func parseDuration(s string) (time.Duration, error) {
|
|||||||
return interval, nil
|
return interval, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func initArchiveServices(config json.RawMessage) {
|
||||||
|
var cfg struct {
|
||||||
|
Retention Retention `json:"retention"`
|
||||||
|
Compression int `json:"compression"`
|
||||||
|
}
|
||||||
|
cfg.Retention.IncludeDB = true
|
||||||
|
|
||||||
|
if err := json.Unmarshal(config, &cfg); err != nil {
|
||||||
|
cclog.Errorf("error while unmarshaling raw config json: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
switch cfg.Retention.Policy {
|
||||||
|
case "delete":
|
||||||
|
RegisterRetentionDeleteService(
|
||||||
|
cfg.Retention.Age,
|
||||||
|
cfg.Retention.IncludeDB,
|
||||||
|
cfg.Retention.OmitTagged)
|
||||||
|
case "move":
|
||||||
|
RegisterRetentionMoveService(
|
||||||
|
cfg.Retention.Age,
|
||||||
|
cfg.Retention.IncludeDB,
|
||||||
|
cfg.Retention.Location,
|
||||||
|
cfg.Retention.OmitTagged)
|
||||||
|
}
|
||||||
|
|
||||||
|
if cfg.Compression > 0 {
|
||||||
|
RegisterCompressionService(cfg.Compression)
|
||||||
|
} else {
|
||||||
|
RegisterCompressionService(DefaultCompressOlderThan)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Start initializes the task manager, parses configurations, and registers background tasks.
|
// Start initializes the task manager, parses configurations, and registers background tasks.
|
||||||
// It starts the gocron scheduler.
|
// It starts the gocron scheduler.
|
||||||
func Start(cronCfg, archiveConfig json.RawMessage) {
|
func Start(cronCfg, archiveConfig json.RawMessage) {
|
||||||
@@ -80,32 +116,11 @@ func Start(cronCfg, archiveConfig json.RawMessage) {
|
|||||||
cclog.Errorf("error while decoding cron config: %v", err)
|
cclog.Errorf("error while decoding cron config: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
var cfg struct {
|
if archiveConfig != nil {
|
||||||
Retention Retention `json:"retention"`
|
initArchiveServices(archiveConfig)
|
||||||
Compression int `json:"compression"`
|
} else {
|
||||||
}
|
// Always enable compression
|
||||||
cfg.Retention.IncludeDB = true
|
RegisterCompressionService(DefaultCompressOlderThan)
|
||||||
|
|
||||||
if err := json.Unmarshal(archiveConfig, &cfg); err != nil {
|
|
||||||
cclog.Warn("Error while unmarshaling raw config json")
|
|
||||||
}
|
|
||||||
|
|
||||||
switch cfg.Retention.Policy {
|
|
||||||
case "delete":
|
|
||||||
RegisterRetentionDeleteService(
|
|
||||||
cfg.Retention.Age,
|
|
||||||
cfg.Retention.IncludeDB,
|
|
||||||
cfg.Retention.OmitTagged)
|
|
||||||
case "move":
|
|
||||||
RegisterRetentionMoveService(
|
|
||||||
cfg.Retention.Age,
|
|
||||||
cfg.Retention.IncludeDB,
|
|
||||||
cfg.Retention.Location,
|
|
||||||
cfg.Retention.OmitTagged)
|
|
||||||
}
|
|
||||||
|
|
||||||
if cfg.Compression > 0 {
|
|
||||||
RegisterCompressionService(cfg.Compression)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
lc := auth.Keys.LdapConfig
|
lc := auth.Keys.LdapConfig
|
||||||
|
|||||||
@@ -77,7 +77,7 @@ type MessageHandler func(subject string, data []byte)
|
|||||||
func Connect() {
|
func Connect() {
|
||||||
clientOnce.Do(func() {
|
clientOnce.Do(func() {
|
||||||
if Keys.Address == "" {
|
if Keys.Address == "" {
|
||||||
cclog.Warn("NATS: no address configured, skipping connection")
|
cclog.Info("NATS: no address configured, skipping connection")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -77,14 +77,14 @@ type PlotConfiguration struct {
|
|||||||
var UIDefaults = WebConfig{
|
var UIDefaults = WebConfig{
|
||||||
JobList: JobListConfig{
|
JobList: JobListConfig{
|
||||||
UsePaging: false,
|
UsePaging: false,
|
||||||
ShowFootprint: true,
|
ShowFootprint: false,
|
||||||
},
|
},
|
||||||
NodeList: NodeListConfig{
|
NodeList: NodeListConfig{
|
||||||
UsePaging: true,
|
UsePaging: true,
|
||||||
},
|
},
|
||||||
JobView: JobViewConfig{
|
JobView: JobViewConfig{
|
||||||
ShowPolarPlot: true,
|
ShowPolarPlot: true,
|
||||||
ShowFootprint: true,
|
ShowFootprint: false,
|
||||||
ShowRoofline: true,
|
ShowRoofline: true,
|
||||||
ShowStatTable: true,
|
ShowStatTable: true,
|
||||||
},
|
},
|
||||||
|
|||||||
Reference in New Issue
Block a user