Merge branch 'devel' into hotfix

This commit is contained in:
2025-07-02 10:02:22 +02:00
24 changed files with 1596 additions and 89 deletions

View File

@@ -122,7 +122,8 @@ func archiveCheckpoints(dir string, archiveDir string, from int64, deleteInstead
return 0, err
}
files, err := findFiles(entries, from, false)
extension := config.Keys.Checkpoints.FileFormat
files, err := findFiles(entries, from, extension, false)
if err != nil {
return 0, err
}

View File

@@ -19,8 +19,10 @@ import (
"sync/atomic"
"time"
"github.com/ClusterCockpit/cc-metric-store/internal/avro"
"github.com/ClusterCockpit/cc-metric-store/internal/config"
"github.com/ClusterCockpit/cc-metric-store/internal/util"
"github.com/linkedin/goavro/v2"
)
// Whenever changed, update MarshalJSON as well!
@@ -41,42 +43,78 @@ var lastCheckpoint time.Time
func Checkpointing(wg *sync.WaitGroup, ctx context.Context) {
lastCheckpoint = time.Now()
ms := GetMemoryStore()
go func() {
defer wg.Done()
d, err := time.ParseDuration(config.Keys.Checkpoints.Interval)
if err != nil {
log.Fatal(err)
}
if d <= 0 {
return
}
if config.Keys.Checkpoints.FileFormat == "json" {
ms := GetMemoryStore()
ticks := func() <-chan time.Time {
if d <= 0 {
return nil
go func() {
defer wg.Done()
d, err := time.ParseDuration(config.Keys.Checkpoints.Interval)
if err != nil {
log.Fatal(err)
}
if d <= 0 {
return
}
ticks := func() <-chan time.Time {
if d <= 0 {
return nil
}
return time.NewTicker(d).C
}()
for {
select {
case <-ctx.Done():
return
case <-ticks:
log.Printf("start checkpointing (starting at %s)...\n", lastCheckpoint.Format(time.RFC3339))
now := time.Now()
n, err := ms.ToCheckpoint(config.Keys.Checkpoints.RootDir,
lastCheckpoint.Unix(), now.Unix())
if err != nil {
log.Printf("checkpointing failed: %s\n", err.Error())
} else {
log.Printf("done: %d checkpoint files created\n", n)
lastCheckpoint = now
}
}
}
return time.NewTicker(d).C
}()
for {
} else {
go func() {
defer wg.Done()
d, _ := time.ParseDuration("1m")
select {
case <-ctx.Done():
return
case <-ticks:
log.Printf("start checkpointing (starting at %s)...\n", lastCheckpoint.Format(time.RFC3339))
now := time.Now()
n, err := ms.ToCheckpoint(config.Keys.Checkpoints.RootDir,
lastCheckpoint.Unix(), now.Unix())
if err != nil {
log.Printf("checkpointing failed: %s\n", err.Error())
} else {
log.Printf("done: %d checkpoint files created\n", n)
lastCheckpoint = now
case <-time.After(time.Duration(avro.CheckpointBufferMinutes) * time.Minute):
// This is the first tick untill we collect the data for given minutes.
avro.GetAvroStore().ToCheckpoint(config.Keys.Checkpoints.RootDir, false)
// log.Printf("Checkpointing %d avro files", count)
}
ticks := func() <-chan time.Time {
if d <= 0 {
return nil
}
return time.NewTicker(d).C
}()
for {
select {
case <-ctx.Done():
return
case <-ticks:
// Regular ticks of 1 minute to write data.
avro.GetAvroStore().ToCheckpoint(config.Keys.Checkpoints.RootDir, false)
// log.Printf("Checkpointing %d avro files", count)
}
}
}
}()
}()
}
}
// As `Float` implements a custom MarshalJSON() function,
@@ -264,19 +302,7 @@ func (l *Level) toCheckpoint(dir string, from, to int64, m *MemoryStore) error {
return bw.Flush()
}
// Metrics stored at the lowest 2 levels are not loaded (root and cluster)!
// This function can only be called once and before the very first write or read.
// Different host's data is loaded to memory in parallel.
func (m *MemoryStore) FromCheckpoint(dir string, from int64) (int, error) {
if _, err := os.Stat(dir); os.IsNotExist(err) {
// The directory does not exist, so create it using os.MkdirAll()
err := os.MkdirAll(dir, 0755) // 0755 sets the permissions for the directory
if err != nil {
log.Fatalf("Error creating directory: %#v\n", err)
}
fmt.Printf("%#v Directory created successfully.\n", dir)
}
func (m *MemoryStore) FromCheckpoint(dir string, from int64, extension string) (int, error) {
var wg sync.WaitGroup
work := make(chan [2]string, NumWorkers)
n, errs := int32(0), int32(0)
@@ -287,7 +313,7 @@ func (m *MemoryStore) FromCheckpoint(dir string, from int64) (int, error) {
defer wg.Done()
for host := range work {
lvl := m.root.findLevelOrCreate(host[:], len(m.Metrics))
nn, err := lvl.fromCheckpoint(filepath.Join(dir, host[0], host[1]), from, m)
nn, err := lvl.fromCheckpoint(m, filepath.Join(dir, host[0], host[1]), from, extension)
if err != nil {
log.Fatalf("error while loading checkpoints: %s", err.Error())
atomic.AddInt32(&errs, 1)
@@ -344,6 +370,234 @@ done:
return int(n), nil
}
// Metrics stored at the lowest 2 levels are not loaded (root and cluster)!
// This function can only be called once and before the very first write or read.
// Different host's data is loaded to memory in parallel.
func (m *MemoryStore) FromCheckpointFiles(dir string, from int64) (int, error) {
if _, err := os.Stat(dir); os.IsNotExist(err) {
// The directory does not exist, so create it using os.MkdirAll()
err := os.MkdirAll(dir, 0755) // 0755 sets the permissions for the directory
if err != nil {
log.Fatalf("Error creating directory: %#v\n", err)
}
fmt.Printf("%#v Directory created successfully.\n", dir)
}
// Config read (replace with your actual config read)
fileFormat := config.Keys.Checkpoints.FileFormat
if fileFormat == "" {
fileFormat = "avro"
}
// Map to easily get the fallback format
oppositeFormat := map[string]string{
"json": "avro",
"avro": "json",
}
// First, attempt to load the specified format
if found, err := checkFilesWithExtension(dir, fileFormat); err != nil {
return 0, fmt.Errorf("error checking files with extension: %v", err)
} else if found {
log.Printf("Loading %s files because fileformat is %s\n", fileFormat, fileFormat)
return m.FromCheckpoint(dir, from, fileFormat)
}
// If not found, attempt the opposite format
altFormat := oppositeFormat[fileFormat]
if found, err := checkFilesWithExtension(dir, altFormat); err != nil {
return 0, fmt.Errorf("error checking files with extension: %v", err)
} else if found {
log.Printf("Loading %s files but fileformat is %s\n", altFormat, fileFormat)
return m.FromCheckpoint(dir, from, altFormat)
}
log.Println("No valid checkpoint files found in the directory.")
return 0, nil
}
func checkFilesWithExtension(dir string, extension string) (bool, error) {
found := false
err := filepath.Walk(dir, func(path string, info os.FileInfo, err error) error {
if err != nil {
return fmt.Errorf("error accessing path %s: %v", path, err)
}
if !info.IsDir() && filepath.Ext(info.Name()) == "."+extension {
found = true
return nil
}
return nil
})
if err != nil {
return false, fmt.Errorf("error walking through directories: %s", err)
}
return found, nil
}
func (l *Level) loadAvroFile(m *MemoryStore, f *os.File, from int64) error {
br := bufio.NewReader(f)
fileName := f.Name()[strings.LastIndex(f.Name(), "/")+1:]
resolution, err := strconv.ParseInt(fileName[0:strings.Index(fileName, "_")], 10, 64)
if err != nil {
return fmt.Errorf("error while reading avro file (resolution parsing) : %s", err)
}
from_timestamp, err := strconv.ParseInt(fileName[strings.Index(fileName, "_")+1:len(fileName)-5], 10, 64)
// Same logic according to lineprotocol
from_timestamp -= (resolution / 2)
if err != nil {
return fmt.Errorf("error converting timestamp from the avro file : %s", err)
}
// fmt.Printf("File : %s with resolution : %d\n", fileName, resolution)
var recordCounter int64 = 0
// Create a new OCF reader from the buffered reader
ocfReader, err := goavro.NewOCFReader(br)
if err != nil {
panic(err)
}
metricsData := make(map[string]util.FloatArray)
for ocfReader.Scan() {
datum, err := ocfReader.Read()
if err != nil {
return fmt.Errorf("error while reading avro file : %s", err)
}
record, ok := datum.(map[string]interface{})
if !ok {
panic("failed to assert datum as map[string]interface{}")
}
for key, value := range record {
metricsData[key] = append(metricsData[key], util.ConvertToFloat(value.(float64)))
}
recordCounter += 1
}
to := (from_timestamp + (recordCounter / (60 / resolution) * 60))
if to < from {
return nil
}
for key, floatArray := range metricsData {
metricName := avro.ReplaceKey(key)
if strings.Contains(metricName, avro.Delimiter) {
subString := strings.Split(metricName, avro.Delimiter)
lvl := l
for i := 0; i < len(subString)-1; i++ {
sel := subString[i]
if lvl.children == nil {
lvl.children = make(map[string]*Level)
}
child, ok := lvl.children[sel]
if !ok {
child = &Level{
metrics: make([]*buffer, len(m.Metrics)),
children: nil,
}
lvl.children[sel] = child
}
lvl = child
}
leafMetricName := subString[len(subString)-1]
err = lvl.createBuffer(m, leafMetricName, floatArray, from_timestamp, resolution)
if err != nil {
return fmt.Errorf("error while creating buffers from avroReader : %s", err)
}
} else {
err = l.createBuffer(m, metricName, floatArray, from_timestamp, resolution)
if err != nil {
return fmt.Errorf("error while creating buffers from avroReader : %s", err)
}
}
}
return nil
}
func (l *Level) createBuffer(m *MemoryStore, metricName string, floatArray util.FloatArray, from int64, resolution int64) error {
n := len(floatArray)
b := &buffer{
frequency: resolution,
start: from,
data: floatArray[0:n:n],
prev: nil,
next: nil,
archived: true,
}
b.close()
minfo, ok := m.Metrics[metricName]
if !ok {
return nil
// return errors.New("Unkown metric: " + name)
}
prev := l.metrics[minfo.Offset]
if prev == nil {
l.metrics[minfo.Offset] = b
} else {
if prev.start > b.start {
return errors.New("wooops")
}
b.prev = prev
prev.next = b
missingCount := ((int(b.start) - int(prev.start)) - len(prev.data)*int(b.frequency))
if missingCount > 0 {
missingCount /= int(b.frequency)
for range missingCount {
prev.data = append(prev.data, util.NaN)
}
prev.data = prev.data[0:len(prev.data):len(prev.data)]
}
}
l.metrics[minfo.Offset] = b
return nil
}
func (l *Level) loadJsonFile(m *MemoryStore, f *os.File, from int64) error {
br := bufio.NewReader(f)
cf := &CheckpointFile{}
if err := json.NewDecoder(br).Decode(cf); err != nil {
return err
}
if cf.To != 0 && cf.To < from {
return nil
}
if err := l.loadFile(cf, m); err != nil {
return err
}
return nil
}
func (l *Level) loadFile(cf *CheckpointFile, m *MemoryStore) error {
for name, metric := range cf.Metrics {
n := len(metric.Data)
@@ -399,7 +653,7 @@ func (l *Level) loadFile(cf *CheckpointFile, m *MemoryStore) error {
return nil
}
func (l *Level) fromCheckpoint(dir string, from int64, m *MemoryStore) (int, error) {
func (l *Level) fromCheckpoint(m *MemoryStore, dir string, from int64, extension string) (int, error) {
direntries, err := os.ReadDir(dir)
if err != nil {
if os.IsNotExist(err) {
@@ -409,7 +663,7 @@ func (l *Level) fromCheckpoint(dir string, from int64, m *MemoryStore) (int, err
return 0, err
}
jsonFiles := make([]fs.DirEntry, 0)
allFiles := make([]fs.DirEntry, 0)
filesLoaded := 0
for _, e := range direntries {
if e.IsDir() {
@@ -418,25 +672,32 @@ func (l *Level) fromCheckpoint(dir string, from int64, m *MemoryStore) (int, err
children: make(map[string]*Level),
}
files, err := child.fromCheckpoint(path.Join(dir, e.Name()), from, m)
files, err := child.fromCheckpoint(m, path.Join(dir, e.Name()), from, extension)
filesLoaded += files
if err != nil {
return filesLoaded, err
}
l.children[e.Name()] = child
} else if strings.HasSuffix(e.Name(), ".json") {
jsonFiles = append(jsonFiles, e)
} else if strings.HasSuffix(e.Name(), "."+extension) {
allFiles = append(allFiles, e)
} else {
return filesLoaded, errors.New("unexpected file: " + dir + "/" + e.Name())
continue
}
}
files, err := findFiles(jsonFiles, from, true)
files, err := findFiles(allFiles, from, extension, true)
if err != nil {
return filesLoaded, err
}
loaders := map[string]func(*MemoryStore, *os.File, int64) error{
"json": l.loadJsonFile,
"avro": l.loadAvroFile,
}
loader := loaders[extension]
for _, filename := range files {
f, err := os.Open(path.Join(dir, filename))
if err != nil {
@@ -444,17 +705,7 @@ func (l *Level) fromCheckpoint(dir string, from int64, m *MemoryStore) (int, err
}
defer f.Close()
br := bufio.NewReader(f)
cf := &CheckpointFile{}
if err = json.NewDecoder(br).Decode(cf); err != nil {
return filesLoaded, err
}
if cf.To != 0 && cf.To < from {
continue
}
if err = l.loadFile(cf, m); err != nil {
if err = loader(m, f, from); err != nil {
return filesLoaded, err
}
@@ -467,10 +718,14 @@ func (l *Level) fromCheckpoint(dir string, from int64, m *MemoryStore) (int, err
// This will probably get very slow over time!
// A solution could be some sort of an index file in which all other files
// and the timespan they contain is listed.
func findFiles(direntries []fs.DirEntry, t int64, findMoreRecentFiles bool) ([]string, error) {
func findFiles(direntries []fs.DirEntry, t int64, extension string, findMoreRecentFiles bool) ([]string, error) {
nums := map[string]int64{}
for _, e := range direntries {
ts, err := strconv.ParseInt(strings.TrimSuffix(e.Name(), ".json"), 10, 64)
if !strings.HasSuffix(e.Name(), "."+extension) {
continue
}
ts, err := strconv.ParseInt(e.Name()[strings.Index(e.Name(), "_")+1:len(e.Name())-5], 10, 64)
if err != nil {
return nil, err
}

View File

@@ -0,0 +1,88 @@
package memorystore
import (
"bufio"
"fmt"
"time"
)
// This is a threshold that allows a node to be healthy with certain number of data points missing.
// Suppose a node does not receive last 5 data points, then healthCheck endpoint will still say a
// node is healthy. Anything more than 5 missing points in metrics of the node will deem the node unhealthy.
const MaxMissingDataPoints int64 = 5
// This is a threshold which allows upto certain number of metrics in a node to be unhealthly.
// Works with MaxMissingDataPoints. Say 5 metrics (including submetrics) do not receive the last
// MaxMissingDataPoints data points, then the node will be deemed healthy. Any more metrics that does
// not receive data for MaxMissingDataPoints data points will deem the node unhealthy.
const MaxUnhealthyMetrics int64 = 5
func (b *buffer) healthCheck() int64 {
// Check if the buffer is empty
if b.data == nil {
return 1
}
buffer_end := b.start + b.frequency*int64(len(b.data))
t := time.Now().Unix()
// Check if the buffer is too old
if t-buffer_end > MaxMissingDataPoints*b.frequency {
return 1
}
return 0
}
func (l *Level) healthCheck(m *MemoryStore, count int64) (int64, error) {
l.lock.RLock()
defer l.lock.RUnlock()
for _, mc := range m.Metrics {
if b := l.metrics[mc.Offset]; b != nil {
count += b.healthCheck()
}
}
for _, lvl := range l.children {
c, err := lvl.healthCheck(m, 0)
if err != nil {
return 0, err
}
count += c
}
return count, nil
}
func (m *MemoryStore) HealthCheck(w *bufio.Writer, selector []string) error {
lvl := m.root.findLevel(selector)
if lvl == nil {
return fmt.Errorf("not found: %#v", selector)
}
buf := make([]byte, 0, 25)
// buf = append(buf, "{"...)
var count int64 = 0
unhealthyMetricsCount, err := lvl.healthCheck(m, count)
if err != nil {
return err
}
if unhealthyMetricsCount < MaxUnhealthyMetrics {
buf = append(buf, "Healthy"...)
} else {
buf = append(buf, "Unhealthy"...)
}
// buf = append(buf, "}\n"...)
if _, err = w.Write(buf); err != nil {
return err
}
return w.Flush()
}

View File

@@ -8,6 +8,7 @@ import (
"sync"
"time"
"github.com/ClusterCockpit/cc-metric-store/internal/avro"
"github.com/ClusterCockpit/cc-metric-store/internal/config"
"github.com/ClusterCockpit/cc-metric-store/internal/util"
"github.com/ClusterCockpit/cc-metric-store/pkg/resampler"
@@ -76,13 +77,94 @@ func GetMemoryStore() *MemoryStore {
}
func Shutdown() {
ms := GetMemoryStore()
log.Printf("Writing to '%s'...\n", config.Keys.Checkpoints.RootDir)
files, err := ms.ToCheckpoint(config.Keys.Checkpoints.RootDir, lastCheckpoint.Unix(), time.Now().Unix())
var files int
var err error
ms := GetMemoryStore()
if config.Keys.Checkpoints.FileFormat == "json" {
files, err = ms.ToCheckpoint(config.Keys.Checkpoints.RootDir, lastCheckpoint.Unix(), time.Now().Unix())
} else {
files, err = avro.GetAvroStore().ToCheckpoint(config.Keys.Checkpoints.RootDir, true)
close(avro.LineProtocolMessages)
}
if err != nil {
log.Printf("Writing checkpoint failed: %s\n", err.Error())
}
log.Printf("Done! (%d files written)\n", files)
// ms.PrintHeirarchy()
}
// func (m *MemoryStore) PrintHeirarchy() {
// m.root.lock.Lock()
// defer m.root.lock.Unlock()
// fmt.Printf("Root : \n")
// for lvl1, sel1 := range m.root.children {
// fmt.Printf("\t%s\n", lvl1)
// for lvl2, sel2 := range sel1.children {
// fmt.Printf("\t\t%s\n", lvl2)
// if lvl1 == "fritz" && lvl2 == "f0201" {
// for name, met := range m.Metrics {
// mt := sel2.metrics[met.Offset]
// fmt.Printf("\t\t\t\t%s\n", name)
// fmt.Printf("\t\t\t\t")
// for mt != nil {
// // if name == "cpu_load" {
// fmt.Printf("%d(%d) -> %#v", mt.start, len(mt.data), mt.data)
// // }
// mt = mt.prev
// }
// fmt.Printf("\n")
// }
// }
// for lvl3, sel3 := range sel2.children {
// if lvl1 == "fritz" && lvl2 == "f0201" && lvl3 == "hwthread70" {
// fmt.Printf("\t\t\t\t\t%s\n", lvl3)
// for name, met := range m.Metrics {
// mt := sel3.metrics[met.Offset]
// fmt.Printf("\t\t\t\t\t\t%s\n", name)
// fmt.Printf("\t\t\t\t\t\t")
// for mt != nil {
// // if name == "clock" {
// fmt.Printf("%d(%d) -> %#v", mt.start, len(mt.data), mt.data)
// mt = mt.prev
// }
// fmt.Printf("\n")
// }
// // for i, _ := range sel3.metrics {
// // fmt.Printf("\t\t\t\t\t%s\n", getName(configmetrics, i))
// // }
// }
// }
// }
// }
// }
func getName(m *MemoryStore, i int) string {
for key, val := range m.Metrics {
if val.Offset == i {
return key
}
}
return ""
}
func Retention(wg *sync.WaitGroup, ctx context.Context) {