Initial commit for staging changes

This commit is contained in:
Aditya Ujeniya 2025-04-17 09:55:09 +02:00
parent e1c5ded933
commit b53832a055
6 changed files with 272 additions and 32 deletions

View File

@ -21,6 +21,7 @@ import (
"time"
"github.com/ClusterCockpit/cc-metric-store/internal/api"
"github.com/ClusterCockpit/cc-metric-store/internal/avro"
"github.com/ClusterCockpit/cc-metric-store/internal/config"
"github.com/ClusterCockpit/cc-metric-store/internal/memorystore"
"github.com/ClusterCockpit/cc-metric-store/internal/runtimeEnv"
@ -95,6 +96,7 @@ func main() {
memorystore.Retention(&wg, ctx)
memorystore.Checkpointing(&wg, ctx)
memorystore.Archiving(&wg, ctx)
avro.DataStaging(&wg, ctx)
r := http.NewServeMux()
api.MountRoutes(r)

View File

@ -9,6 +9,7 @@ import (
"sync"
"time"
"github.com/ClusterCockpit/cc-metric-store/internal/avro"
"github.com/ClusterCockpit/cc-metric-store/internal/config"
"github.com/ClusterCockpit/cc-metric-store/internal/memorystore"
"github.com/ClusterCockpit/cc-metric-store/internal/util"
@ -197,6 +198,8 @@ func decodeLine(dec *lineprotocol.Decoder,
var lvl *memorystore.Level = nil
prevCluster, prevHost := "", ""
var AvroBuf avro.AvroStruct
var ok bool
for dec.Next() {
rawmeasurement, err := dec.Measurement()
@ -329,6 +332,17 @@ func decodeLine(dec *lineprotocol.Decoder,
return fmt.Errorf("host %s: timestamp : %#v with error : %#v", host, t, err.Error())
}
if config.Keys.Checkpoints.FileFormat != "json" {
AvroBuf.MetricName = string(metricBuf)
AvroBuf.Cluster = cluster
AvroBuf.Node = host
AvroBuf.Selector = selector
AvroBuf.Value = metric.Value
AvroBuf.Timestamp = t.Unix()
avro.LineProtocolMessages <- AvroBuf
}
if err := ms.WriteToLevel(lvl, selector, t.Unix(), []memorystore.Metric{metric}); err != nil {
return err
}

View File

@ -0,0 +1,71 @@
package avro
import (
"context"
"fmt"
"strconv"
"sync"
"github.com/ClusterCockpit/cc-metric-store/internal/config"
)
func DataStaging(wg *sync.WaitGroup, ctx context.Context) {
if config.Keys.Checkpoints.FileFormat == "json" {
wg.Done()
}
// AvroPool is a pool of Avro writers.
go func() {
defer wg.Done()
var avroLevel *AvroLevel
oldSelector := make([]string, 0)
for {
select {
case <-ctx.Done():
return
case val := <-LineProtocolMessages:
//Fetch the frequency of the metric from the global configuration
freq, err := config.Keys.GetMetricFrequency(val.MetricName)
if err != nil {
fmt.Printf("Error fetching metric frequency: %s\n", err)
continue
}
// Create a new selector for the Avro level
// The selector is a slice of strings that represents the path to the
// Avro level. It is created by appending the cluster, node, and metric
// name to the selector.
var selector []string
selector = append(selector, val.Cluster, val.Node, strconv.FormatInt(freq, 10))
if !testEq(oldSelector, selector) {
// Get the Avro level for the metric
avroLevel = avroStore.root.findAvroLevelOrCreate(selector)
// If the Avro level is nil, create a new one
if avroLevel == nil {
fmt.Printf("Error creating or finding the level with cluster : %s, node : %s, metric : %s\n", val.Cluster, val.Node, val.MetricName)
}
copy(oldSelector, selector)
}
avroLevel.addMetric(val.MetricName, val.Value, val.Timestamp)
}
}
}()
}
func testEq(a, b []string) bool {
if len(a) != len(b) {
return false
}
for i := range a {
if a[i] != b[i] {
return false
}
}
return true
}

109
internal/avro/avroStruct.go Normal file
View File

@ -0,0 +1,109 @@
package avro
import (
"fmt"
"sync"
"github.com/ClusterCockpit/cc-metric-store/internal/util"
)
var LineProtocolMessages = make(chan AvroStruct)
var AvroCounter = 0
// CheckpointBufferMinutes should always be in minutes.
// Its controls the amount of data to hold for given amount of time.
var CheckpointBufferMinutes = 3
type AvroStruct struct {
MetricName string
Cluster string
Node string
Selector []string
Value util.Float
Timestamp int64
}
type AvroStore struct {
root AvroLevel
}
var avroStore AvroStore
type AvroLevel struct {
children map[string]*AvroLevel
data map[int64]map[string]util.Float
lock sync.RWMutex
}
func (l *AvroLevel) findAvroLevelOrCreate(selector []string) *AvroLevel {
if len(selector) == 0 {
return l
}
// Allow concurrent reads:
l.lock.RLock()
var child *AvroLevel
var ok bool
if l.children == nil {
// Children map needs to be created...
l.lock.RUnlock()
} else {
child, ok := l.children[selector[0]]
l.lock.RUnlock()
if ok {
return child.findAvroLevelOrCreate(selector[1:])
}
}
// The level does not exist, take write lock for unqiue access:
l.lock.Lock()
// While this thread waited for the write lock, another thread
// could have created the child node.
if l.children != nil {
child, ok = l.children[selector[0]]
if ok {
l.lock.Unlock()
return child.findAvroLevelOrCreate(selector[1:])
}
}
child = &AvroLevel{
data: make(map[int64]map[string]util.Float, 0),
children: nil,
}
if l.children != nil {
l.children[selector[0]] = child
} else {
l.children = map[string]*AvroLevel{selector[0]: child}
}
l.lock.Unlock()
return child.findAvroLevelOrCreate(selector[1:])
}
func (l *AvroLevel) addMetric(metricName string, value util.Float, timestamp int64) {
l.lock.Lock()
defer l.lock.Unlock()
// Create a key value for the first time
if len(l.data) == 0 {
l.data[timestamp] = make(map[string]util.Float, 0)
l.data[timestamp][metricName] = value
fmt.Printf("Creating new timestamp because no data exists\n")
}
// Iterate over timestamps and choose the one which is within range.
// Since its epoch time, we check if the difference is less than 60 seconds.
for ts := range l.data {
if (ts - timestamp) < 60 {
l.data[ts][metricName] = value
return
}
}
// Create a new timestamp if none is found
l.data[timestamp] = make(map[string]util.Float, 0)
l.data[timestamp][metricName] = value
}

View File

@ -81,9 +81,10 @@ type Config struct {
Metrics map[string]MetricConfig `json:"metrics"`
HttpConfig *HttpConfig `json:"http-api"`
Checkpoints struct {
Interval string `json:"interval"`
RootDir string `json:"directory"`
Restore string `json:"restore"`
FileFormat string `json:"file-format"`
Interval string `json:"interval"`
RootDir string `json:"directory"`
Restore string `json:"restore"`
} `json:"checkpoints"`
Debug struct {
DumpToFile string `json:"dump-to-file"`
@ -113,3 +114,10 @@ func Init(file string) {
log.Fatal(err)
}
}
func (c *Config) GetMetricFrequency(metricName string) (int64, error) {
if metric, ok := c.Metrics[metricName]; ok {
return metric.Frequency, nil
}
return 0, fmt.Errorf("metric %s not found", metricName)
}

View File

@ -19,6 +19,7 @@ import (
"sync/atomic"
"time"
"github.com/ClusterCockpit/cc-metric-store/internal/avro"
"github.com/ClusterCockpit/cc-metric-store/internal/config"
"github.com/ClusterCockpit/cc-metric-store/internal/util"
)
@ -40,43 +41,78 @@ type CheckpointFile struct {
var lastCheckpoint time.Time
func Checkpointing(wg *sync.WaitGroup, ctx context.Context) {
lastCheckpoint = time.Now()
ms := GetMemoryStore()
if config.Keys.Checkpoints.FileFormat == "json" {
lastCheckpoint = time.Now()
ms := GetMemoryStore()
go func() {
defer wg.Done()
d, err := time.ParseDuration(config.Keys.Checkpoints.Interval)
if err != nil {
log.Fatal(err)
}
if d <= 0 {
return
}
ticks := func() <-chan time.Time {
if d <= 0 {
return nil
go func() {
defer wg.Done()
d, err := time.ParseDuration(config.Keys.Checkpoints.Interval)
if err != nil {
log.Fatal(err)
}
if d <= 0 {
return
}
ticks := func() <-chan time.Time {
if d <= 0 {
return nil
}
return time.NewTicker(d).C
}()
for {
select {
case <-ctx.Done():
return
case <-ticks:
log.Printf("start checkpointing (starting at %s)...\n", lastCheckpoint.Format(time.RFC3339))
now := time.Now()
n, err := ms.ToCheckpoint(config.Keys.Checkpoints.RootDir,
lastCheckpoint.Unix(), now.Unix())
if err != nil {
log.Printf("checkpointing failed: %s\n", err.Error())
} else {
log.Printf("done: %d checkpoint files created\n", n)
lastCheckpoint = now
}
}
}
return time.NewTicker(d).C
}()
for {
} else {
go func() {
defer wg.Done()
d, err := time.ParseDuration("1m")
if err != nil {
log.Fatal(err)
}
if d <= 0 {
return
}
select {
case <-ctx.Done():
return
case <-ticks:
log.Printf("start checkpointing (starting at %s)...\n", lastCheckpoint.Format(time.RFC3339))
now := time.Now()
n, err := ms.ToCheckpoint(config.Keys.Checkpoints.RootDir,
lastCheckpoint.Unix(), now.Unix())
if err != nil {
log.Printf("checkpointing failed: %s\n", err.Error())
} else {
log.Printf("done: %d checkpoint files created\n", n)
lastCheckpoint = now
case <-time.After(time.Duration(avro.CheckpointBufferMinutes) * time.Minute):
// This is the first tick untill we collect the data for given minutes.
}
ticks := func() <-chan time.Time {
if d <= 0 {
return nil
}
return time.NewTicker(d).C
}()
for {
select {
case <-ctx.Done():
return
case <-ticks:
// Regular ticks of 1 minute to write data.
}
}
}
}()
}()
}
}
// As `Float` implements a custom MarshalJSON() function,