diff --git a/cmd/cc-metric-store/main.go b/cmd/cc-metric-store/main.go index 3654b3b..41efa3f 100644 --- a/cmd/cc-metric-store/main.go +++ b/cmd/cc-metric-store/main.go @@ -21,6 +21,7 @@ import ( "time" "github.com/ClusterCockpit/cc-metric-store/internal/api" + "github.com/ClusterCockpit/cc-metric-store/internal/avro" "github.com/ClusterCockpit/cc-metric-store/internal/config" "github.com/ClusterCockpit/cc-metric-store/internal/memorystore" "github.com/ClusterCockpit/cc-metric-store/internal/runtimeEnv" @@ -95,6 +96,7 @@ func main() { memorystore.Retention(&wg, ctx) memorystore.Checkpointing(&wg, ctx) memorystore.Archiving(&wg, ctx) + avro.DataStaging(&wg, ctx) r := http.NewServeMux() api.MountRoutes(r) diff --git a/internal/api/lineprotocol.go b/internal/api/lineprotocol.go index 2e1e5ca..a2c2066 100644 --- a/internal/api/lineprotocol.go +++ b/internal/api/lineprotocol.go @@ -9,6 +9,7 @@ import ( "sync" "time" + "github.com/ClusterCockpit/cc-metric-store/internal/avro" "github.com/ClusterCockpit/cc-metric-store/internal/config" "github.com/ClusterCockpit/cc-metric-store/internal/memorystore" "github.com/ClusterCockpit/cc-metric-store/internal/util" @@ -197,6 +198,8 @@ func decodeLine(dec *lineprotocol.Decoder, var lvl *memorystore.Level = nil prevCluster, prevHost := "", "" + var AvroBuf avro.AvroStruct + var ok bool for dec.Next() { rawmeasurement, err := dec.Measurement() @@ -329,6 +332,17 @@ func decodeLine(dec *lineprotocol.Decoder, return fmt.Errorf("host %s: timestamp : %#v with error : %#v", host, t, err.Error()) } + if config.Keys.Checkpoints.FileFormat != "json" { + AvroBuf.MetricName = string(metricBuf) + AvroBuf.Cluster = cluster + AvroBuf.Node = host + AvroBuf.Selector = selector + AvroBuf.Value = metric.Value + AvroBuf.Timestamp = t.Unix() + + avro.LineProtocolMessages <- AvroBuf + } + if err := ms.WriteToLevel(lvl, selector, t.Unix(), []memorystore.Metric{metric}); err != nil { return err } diff --git a/internal/avro/avroHelper.go b/internal/avro/avroHelper.go new file mode 100644 index 0000000..52f696f --- /dev/null +++ b/internal/avro/avroHelper.go @@ -0,0 +1,71 @@ +package avro + +import ( + "context" + "fmt" + "strconv" + "sync" + + "github.com/ClusterCockpit/cc-metric-store/internal/config" +) + +func DataStaging(wg *sync.WaitGroup, ctx context.Context) { + + if config.Keys.Checkpoints.FileFormat == "json" { + wg.Done() + } + + // AvroPool is a pool of Avro writers. + go func() { + defer wg.Done() + + var avroLevel *AvroLevel + oldSelector := make([]string, 0) + + for { + select { + case <-ctx.Done(): + return + case val := <-LineProtocolMessages: + //Fetch the frequency of the metric from the global configuration + freq, err := config.Keys.GetMetricFrequency(val.MetricName) + if err != nil { + fmt.Printf("Error fetching metric frequency: %s\n", err) + continue + } + + // Create a new selector for the Avro level + // The selector is a slice of strings that represents the path to the + // Avro level. It is created by appending the cluster, node, and metric + // name to the selector. + var selector []string + selector = append(selector, val.Cluster, val.Node, strconv.FormatInt(freq, 10)) + + if !testEq(oldSelector, selector) { + // Get the Avro level for the metric + avroLevel = avroStore.root.findAvroLevelOrCreate(selector) + + // If the Avro level is nil, create a new one + if avroLevel == nil { + fmt.Printf("Error creating or finding the level with cluster : %s, node : %s, metric : %s\n", val.Cluster, val.Node, val.MetricName) + } + copy(oldSelector, selector) + } + + avroLevel.addMetric(val.MetricName, val.Value, val.Timestamp) + } + } + }() +} + +func testEq(a, b []string) bool { + if len(a) != len(b) { + return false + } + for i := range a { + if a[i] != b[i] { + return false + } + } + return true +} diff --git a/internal/avro/avroStruct.go b/internal/avro/avroStruct.go new file mode 100644 index 0000000..27f9876 --- /dev/null +++ b/internal/avro/avroStruct.go @@ -0,0 +1,109 @@ +package avro + +import ( + "fmt" + "sync" + + "github.com/ClusterCockpit/cc-metric-store/internal/util" +) + +var LineProtocolMessages = make(chan AvroStruct) + +var AvroCounter = 0 + +// CheckpointBufferMinutes should always be in minutes. +// Its controls the amount of data to hold for given amount of time. +var CheckpointBufferMinutes = 3 + +type AvroStruct struct { + MetricName string + Cluster string + Node string + Selector []string + Value util.Float + Timestamp int64 +} + +type AvroStore struct { + root AvroLevel +} + +var avroStore AvroStore + +type AvroLevel struct { + children map[string]*AvroLevel + data map[int64]map[string]util.Float + lock sync.RWMutex +} + +func (l *AvroLevel) findAvroLevelOrCreate(selector []string) *AvroLevel { + if len(selector) == 0 { + return l + } + + // Allow concurrent reads: + l.lock.RLock() + var child *AvroLevel + var ok bool + if l.children == nil { + // Children map needs to be created... + l.lock.RUnlock() + } else { + child, ok := l.children[selector[0]] + l.lock.RUnlock() + if ok { + return child.findAvroLevelOrCreate(selector[1:]) + } + } + + // The level does not exist, take write lock for unqiue access: + l.lock.Lock() + // While this thread waited for the write lock, another thread + // could have created the child node. + if l.children != nil { + child, ok = l.children[selector[0]] + if ok { + l.lock.Unlock() + return child.findAvroLevelOrCreate(selector[1:]) + } + } + + child = &AvroLevel{ + data: make(map[int64]map[string]util.Float, 0), + children: nil, + } + + if l.children != nil { + l.children[selector[0]] = child + } else { + l.children = map[string]*AvroLevel{selector[0]: child} + } + l.lock.Unlock() + return child.findAvroLevelOrCreate(selector[1:]) +} + +func (l *AvroLevel) addMetric(metricName string, value util.Float, timestamp int64) { + l.lock.Lock() + defer l.lock.Unlock() + + // Create a key value for the first time + if len(l.data) == 0 { + l.data[timestamp] = make(map[string]util.Float, 0) + l.data[timestamp][metricName] = value + fmt.Printf("Creating new timestamp because no data exists\n") + } + + // Iterate over timestamps and choose the one which is within range. + // Since its epoch time, we check if the difference is less than 60 seconds. + for ts := range l.data { + if (ts - timestamp) < 60 { + l.data[ts][metricName] = value + return + } + } + + // Create a new timestamp if none is found + l.data[timestamp] = make(map[string]util.Float, 0) + l.data[timestamp][metricName] = value + +} diff --git a/internal/config/config.go b/internal/config/config.go index 4ae3278..0469924 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -81,9 +81,10 @@ type Config struct { Metrics map[string]MetricConfig `json:"metrics"` HttpConfig *HttpConfig `json:"http-api"` Checkpoints struct { - Interval string `json:"interval"` - RootDir string `json:"directory"` - Restore string `json:"restore"` + FileFormat string `json:"file-format"` + Interval string `json:"interval"` + RootDir string `json:"directory"` + Restore string `json:"restore"` } `json:"checkpoints"` Debug struct { DumpToFile string `json:"dump-to-file"` @@ -113,3 +114,10 @@ func Init(file string) { log.Fatal(err) } } + +func (c *Config) GetMetricFrequency(metricName string) (int64, error) { + if metric, ok := c.Metrics[metricName]; ok { + return metric.Frequency, nil + } + return 0, fmt.Errorf("metric %s not found", metricName) +} diff --git a/internal/memorystore/checkpoint.go b/internal/memorystore/checkpoint.go index c607c04..afd6cf3 100644 --- a/internal/memorystore/checkpoint.go +++ b/internal/memorystore/checkpoint.go @@ -19,6 +19,7 @@ import ( "sync/atomic" "time" + "github.com/ClusterCockpit/cc-metric-store/internal/avro" "github.com/ClusterCockpit/cc-metric-store/internal/config" "github.com/ClusterCockpit/cc-metric-store/internal/util" ) @@ -40,43 +41,78 @@ type CheckpointFile struct { var lastCheckpoint time.Time func Checkpointing(wg *sync.WaitGroup, ctx context.Context) { - lastCheckpoint = time.Now() - ms := GetMemoryStore() + if config.Keys.Checkpoints.FileFormat == "json" { + lastCheckpoint = time.Now() + ms := GetMemoryStore() - go func() { - defer wg.Done() - d, err := time.ParseDuration(config.Keys.Checkpoints.Interval) - if err != nil { - log.Fatal(err) - } - if d <= 0 { - return - } - - ticks := func() <-chan time.Time { - if d <= 0 { - return nil + go func() { + defer wg.Done() + d, err := time.ParseDuration(config.Keys.Checkpoints.Interval) + if err != nil { + log.Fatal(err) + } + if d <= 0 { + return + } + + ticks := func() <-chan time.Time { + if d <= 0 { + return nil + } + return time.NewTicker(d).C + }() + for { + select { + case <-ctx.Done(): + return + case <-ticks: + log.Printf("start checkpointing (starting at %s)...\n", lastCheckpoint.Format(time.RFC3339)) + now := time.Now() + n, err := ms.ToCheckpoint(config.Keys.Checkpoints.RootDir, + lastCheckpoint.Unix(), now.Unix()) + if err != nil { + log.Printf("checkpointing failed: %s\n", err.Error()) + } else { + log.Printf("done: %d checkpoint files created\n", n) + lastCheckpoint = now + } + } } - return time.NewTicker(d).C }() - for { + } else { + go func() { + defer wg.Done() + d, err := time.ParseDuration("1m") + if err != nil { + log.Fatal(err) + } + if d <= 0 { + return + } + select { case <-ctx.Done(): return - case <-ticks: - log.Printf("start checkpointing (starting at %s)...\n", lastCheckpoint.Format(time.RFC3339)) - now := time.Now() - n, err := ms.ToCheckpoint(config.Keys.Checkpoints.RootDir, - lastCheckpoint.Unix(), now.Unix()) - if err != nil { - log.Printf("checkpointing failed: %s\n", err.Error()) - } else { - log.Printf("done: %d checkpoint files created\n", n) - lastCheckpoint = now + case <-time.After(time.Duration(avro.CheckpointBufferMinutes) * time.Minute): + // This is the first tick untill we collect the data for given minutes. + } + + ticks := func() <-chan time.Time { + if d <= 0 { + return nil + } + return time.NewTicker(d).C + }() + for { + select { + case <-ctx.Done(): + return + case <-ticks: + // Regular ticks of 1 minute to write data. } } - } - }() + }() + } } // As `Float` implements a custom MarshalJSON() function,