Import metric store packages

This commit is contained in:
2025-08-08 14:24:52 +02:00
parent 86453e7e11
commit a50b832c2a
12 changed files with 2802 additions and 0 deletions

View File

@@ -0,0 +1,473 @@
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
// All rights reserved. This file is part of cc-backend.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package avro
import (
"bufio"
"encoding/json"
"errors"
"fmt"
"log"
"os"
"path"
"sort"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/ClusterCockpit/cc-lib/util"
"github.com/linkedin/goavro/v2"
)
var NumWorkers int = 4
var ErrNoNewData error = errors.New("no data in the pool")
func (as *AvroStore) ToCheckpoint(dir string, dumpAll bool) (int, error) {
levels := make([]*AvroLevel, 0)
selectors := make([][]string, 0)
as.root.lock.RLock()
// Cluster
for sel1, l1 := range as.root.children {
l1.lock.RLock()
// Node
for sel2, l2 := range l1.children {
l2.lock.RLock()
// Frequency
for sel3, l3 := range l2.children {
levels = append(levels, l3)
selectors = append(selectors, []string{sel1, sel2, sel3})
}
l2.lock.RUnlock()
}
l1.lock.RUnlock()
}
as.root.lock.RUnlock()
type workItem struct {
level *AvroLevel
dir string
selector []string
}
n, errs := int32(0), int32(0)
var wg sync.WaitGroup
wg.Add(NumWorkers)
work := make(chan workItem, NumWorkers*2)
for range NumWorkers {
go func() {
defer wg.Done()
for workItem := range work {
var from int64 = getTimestamp(workItem.dir)
if err := workItem.level.toCheckpoint(workItem.dir, from, dumpAll); err != nil {
if err == ErrNoNewData {
continue
}
log.Printf("error while checkpointing %#v: %s", workItem.selector, err.Error())
atomic.AddInt32(&errs, 1)
} else {
atomic.AddInt32(&n, 1)
}
}
}()
}
for i := range len(levels) {
dir := path.Join(dir, path.Join(selectors[i]...))
work <- workItem{
level: levels[i],
dir: dir,
selector: selectors[i],
}
}
close(work)
wg.Wait()
if errs > 0 {
return int(n), fmt.Errorf("%d errors happend while creating avro checkpoints (%d successes)", errs, n)
}
return int(n), nil
}
// getTimestamp returns the timestamp from the directory name
func getTimestamp(dir string) int64 {
// Extract the resolution and timestamp from the directory name
// The existing avro file will be in epoch timestamp format
// iterate over all the files in the directory and find the maximum timestamp
// and return it
resolution := path.Base(dir)
dir = path.Dir(dir)
files, err := os.ReadDir(dir)
if err != nil {
return 0
}
var maxTs int64 = 0
if len(files) == 0 {
return 0
}
for _, file := range files {
if file.IsDir() {
continue
}
name := file.Name()
if len(name) < 5 || !strings.HasSuffix(name, ".avro") || !strings.HasPrefix(name, resolution+"_") {
continue
}
ts, err := strconv.ParseInt(name[strings.Index(name, "_")+1:len(name)-5], 10, 64)
if err != nil {
fmt.Printf("error while parsing timestamp: %s\n", err.Error())
continue
}
if ts > maxTs {
maxTs = ts
}
}
interval, _ := time.ParseDuration(Keys.Checkpoints.Interval)
updateTime := time.Unix(maxTs, 0).Add(interval).Add(time.Duration(CheckpointBufferMinutes-1) * time.Minute).Unix()
if updateTime < time.Now().Unix() {
return 0
}
return maxTs
}
func (l *AvroLevel) toCheckpoint(dir string, from int64, dumpAll bool) error {
l.lock.Lock()
defer l.lock.Unlock()
// fmt.Printf("Checkpointing directory: %s\n", dir)
// filepath contains the resolution
int_res, _ := strconv.Atoi(path.Base(dir))
// find smallest overall timestamp in l.data map and delete it from l.data
var minTs int64 = int64(1<<63 - 1)
for ts, dat := range l.data {
if ts < minTs && len(dat) != 0 {
minTs = ts
}
}
if from == 0 && minTs != int64(1<<63-1) {
from = minTs
}
if from == 0 {
return ErrNoNewData
}
var schema string
var codec *goavro.Codec
record_list := make([]map[string]interface{}, 0)
var f *os.File
filePath := dir + fmt.Sprintf("_%d.avro", from)
var err error
fp_, err_ := os.Stat(filePath)
if errors.Is(err_, os.ErrNotExist) {
err = os.MkdirAll(path.Dir(dir), 0o755)
if err != nil {
return fmt.Errorf("failed to create directory: %v", err)
}
} else if fp_.Size() != 0 {
f, err = os.Open(filePath)
if err != nil {
return fmt.Errorf("failed to open existing avro file: %v", err)
}
br := bufio.NewReader(f)
reader, err := goavro.NewOCFReader(br)
if err != nil {
return fmt.Errorf("failed to create OCF reader: %v", err)
}
codec = reader.Codec()
schema = codec.Schema()
f.Close()
}
time_ref := time.Now().Add(time.Duration(-CheckpointBufferMinutes+1) * time.Minute).Unix()
if dumpAll {
time_ref = time.Now().Unix()
}
// Empty values
if len(l.data) == 0 {
// we checkpoint avro files every 60 seconds
repeat := 60 / int_res
for range repeat {
record_list = append(record_list, make(map[string]interface{}))
}
}
readFlag := true
for ts := range l.data {
flag := false
if ts < time_ref {
data := l.data[ts]
schema_gen, err := generateSchema(data)
if err != nil {
return err
}
flag, schema, err = compareSchema(schema, schema_gen)
if err != nil {
return fmt.Errorf("failed to compare read and generated schema: %v", err)
}
if flag && readFlag && !errors.Is(err_, os.ErrNotExist) {
f.Close()
f, err = os.Open(filePath)
if err != nil {
return fmt.Errorf("failed to open Avro file: %v", err)
}
br := bufio.NewReader(f)
ocfReader, err := goavro.NewOCFReader(br)
if err != nil {
return fmt.Errorf("failed to create OCF reader while changing schema: %v", err)
}
for ocfReader.Scan() {
record, err := ocfReader.Read()
if err != nil {
return fmt.Errorf("failed to read record: %v", err)
}
record_list = append(record_list, record.(map[string]interface{}))
}
f.Close()
err = os.Remove(filePath)
if err != nil {
return fmt.Errorf("failed to delete file: %v", err)
}
readFlag = false
}
codec, err = goavro.NewCodec(schema)
if err != nil {
return fmt.Errorf("failed to create codec after merged schema: %v", err)
}
record_list = append(record_list, generateRecord(data))
delete(l.data, ts)
}
}
if len(record_list) == 0 {
return ErrNoNewData
}
f, err = os.OpenFile(filePath, os.O_CREATE|os.O_APPEND|os.O_RDWR, 0o644)
if err != nil {
return fmt.Errorf("failed to append new avro file: %v", err)
}
// fmt.Printf("Codec : %#v\n", codec)
writer, err := goavro.NewOCFWriter(goavro.OCFConfig{
W: f,
Codec: codec,
CompressionName: goavro.CompressionDeflateLabel,
})
if err != nil {
return fmt.Errorf("failed to create OCF writer: %v", err)
}
// Append the new record
if err := writer.Append(record_list); err != nil {
return fmt.Errorf("failed to append record: %v", err)
}
f.Close()
return nil
}
func compareSchema(schemaRead, schemaGen string) (bool, string, error) {
var genSchema, readSchema AvroSchema
if schemaRead == "" {
return false, schemaGen, nil
}
// Unmarshal the schema strings into AvroSchema structs
if err := json.Unmarshal([]byte(schemaGen), &genSchema); err != nil {
return false, "", fmt.Errorf("failed to parse generated schema: %v", err)
}
if err := json.Unmarshal([]byte(schemaRead), &readSchema); err != nil {
return false, "", fmt.Errorf("failed to parse read schema: %v", err)
}
sort.Slice(genSchema.Fields, func(i, j int) bool {
return genSchema.Fields[i].Name < genSchema.Fields[j].Name
})
sort.Slice(readSchema.Fields, func(i, j int) bool {
return readSchema.Fields[i].Name < readSchema.Fields[j].Name
})
// Check if schemas are identical
schemasEqual := true
if len(genSchema.Fields) <= len(readSchema.Fields) {
for i := range genSchema.Fields {
if genSchema.Fields[i].Name != readSchema.Fields[i].Name {
schemasEqual = false
break
}
}
// If schemas are identical, return the read schema
if schemasEqual {
return false, schemaRead, nil
}
}
// Create a map to hold unique fields from both schemas
fieldMap := make(map[string]AvroField)
// Add fields from the read schema
for _, field := range readSchema.Fields {
fieldMap[field.Name] = field
}
// Add or update fields from the generated schema
for _, field := range genSchema.Fields {
fieldMap[field.Name] = field
}
// Create a union schema by collecting fields from the map
var mergedFields []AvroField
for _, field := range fieldMap {
mergedFields = append(mergedFields, field)
}
// Sort fields by name for consistency
sort.Slice(mergedFields, func(i, j int) bool {
return mergedFields[i].Name < mergedFields[j].Name
})
// Create the merged schema
mergedSchema := AvroSchema{
Type: "record",
Name: genSchema.Name,
Fields: mergedFields,
}
// Check if schemas are identical
schemasEqual = len(mergedSchema.Fields) == len(readSchema.Fields)
if schemasEqual {
for i := range mergedSchema.Fields {
if mergedSchema.Fields[i].Name != readSchema.Fields[i].Name {
schemasEqual = false
break
}
}
if schemasEqual {
return false, schemaRead, nil
}
}
// Marshal the merged schema back to JSON
mergedSchemaJson, err := json.Marshal(mergedSchema)
if err != nil {
return false, "", fmt.Errorf("failed to marshal merged schema: %v", err)
}
return true, string(mergedSchemaJson), nil
}
func generateSchema(data map[string]util.Float) (string, error) {
// Define the Avro schema structure
schema := map[string]interface{}{
"type": "record",
"name": "DataRecord",
"fields": []map[string]interface{}{},
}
fieldTracker := make(map[string]struct{})
for key := range data {
if _, exists := fieldTracker[key]; !exists {
key = correctKey(key)
field := map[string]interface{}{
"name": key,
"type": "double",
"default": -1.0,
}
schema["fields"] = append(schema["fields"].([]map[string]interface{}), field)
fieldTracker[key] = struct{}{}
}
}
schemaString, err := json.Marshal(schema)
if err != nil {
return "", fmt.Errorf("failed to marshal schema: %v", err)
}
return string(schemaString), nil
}
func generateRecord(data map[string]util.Float) map[string]interface{} {
record := make(map[string]interface{})
// Iterate through each map in data
for key, value := range data {
key = correctKey(key)
// Set the value in the record
record[key] = value.Double()
}
return record
}
func correctKey(key string) string {
// Replace any invalid characters in the key
// For example, replace spaces with underscores
key = strings.ReplaceAll(key, ":", "___")
key = strings.ReplaceAll(key, ".", "__")
return key
}
func ReplaceKey(key string) string {
// Replace any invalid characters in the key
// For example, replace spaces with underscores
key = strings.ReplaceAll(key, "___", ":")
key = strings.ReplaceAll(key, "__", ".")
return key
}

View File

@@ -0,0 +1,79 @@
package avro
import (
"context"
"fmt"
"strconv"
"sync"
)
func DataStaging(wg *sync.WaitGroup, ctx context.Context) {
// AvroPool is a pool of Avro writers.
go func() {
if Keys.Checkpoints.FileFormat == "json" {
wg.Done() // Mark this goroutine as done
return // Exit the goroutine
}
defer wg.Done()
var avroLevel *AvroLevel
oldSelector := make([]string, 0)
for {
select {
case <-ctx.Done():
return
case val := <-LineProtocolMessages:
//Fetch the frequency of the metric from the global configuration
freq, err := Keys.GetMetricFrequency(val.MetricName)
if err != nil {
fmt.Printf("Error fetching metric frequency: %s\n", err)
continue
}
metricName := ""
for _, selector_name := range val.Selector {
metricName += selector_name + Delimiter
}
metricName += val.MetricName
// Create a new selector for the Avro level
// The selector is a slice of strings that represents the path to the
// Avro level. It is created by appending the cluster, node, and metric
// name to the selector.
var selector []string
selector = append(selector, val.Cluster, val.Node, strconv.FormatInt(freq, 10))
if !testEq(oldSelector, selector) {
// Get the Avro level for the metric
avroLevel = avroStore.root.findAvroLevelOrCreate(selector)
// If the Avro level is nil, create a new one
if avroLevel == nil {
fmt.Printf("Error creating or finding the level with cluster : %s, node : %s, metric : %s\n", val.Cluster, val.Node, val.MetricName)
}
oldSelector = append([]string{}, selector...)
}
avroLevel.addMetric(metricName, val.Value, val.Timestamp, int(freq))
}
}
}()
}
func testEq(a, b []string) bool {
if len(a) != len(b) {
return false
}
for i := range a {
if a[i] != b[i] {
return false
}
}
return true
}

163
internal/avro/avroStruct.go Normal file
View File

@@ -0,0 +1,163 @@
package avro
import (
"sync"
"github.com/ClusterCockpit/cc-lib/util"
)
var (
LineProtocolMessages = make(chan *AvroStruct)
Delimiter = "ZZZZZ"
)
// CheckpointBufferMinutes should always be in minutes.
// Its controls the amount of data to hold for given amount of time.
var CheckpointBufferMinutes = 3
type AvroStruct struct {
MetricName string
Cluster string
Node string
Selector []string
Value util.Float
Timestamp int64
}
type AvroStore struct {
root AvroLevel
}
var avroStore AvroStore
type AvroLevel struct {
children map[string]*AvroLevel
data map[int64]map[string]util.Float
lock sync.RWMutex
}
type AvroField struct {
Name string `json:"name"`
Type interface{} `json:"type"`
Default interface{} `json:"default,omitempty"`
}
type AvroSchema struct {
Type string `json:"type"`
Name string `json:"name"`
Fields []AvroField `json:"fields"`
}
func (l *AvroLevel) findAvroLevelOrCreate(selector []string) *AvroLevel {
if len(selector) == 0 {
return l
}
// Allow concurrent reads:
l.lock.RLock()
var child *AvroLevel
var ok bool
if l.children == nil {
// Children map needs to be created...
l.lock.RUnlock()
} else {
child, ok := l.children[selector[0]]
l.lock.RUnlock()
if ok {
return child.findAvroLevelOrCreate(selector[1:])
}
}
// The level does not exist, take write lock for unqiue access:
l.lock.Lock()
// While this thread waited for the write lock, another thread
// could have created the child node.
if l.children != nil {
child, ok = l.children[selector[0]]
if ok {
l.lock.Unlock()
return child.findAvroLevelOrCreate(selector[1:])
}
}
child = &AvroLevel{
data: make(map[int64]map[string]util.Float, 0),
children: nil,
}
if l.children != nil {
l.children[selector[0]] = child
} else {
l.children = map[string]*AvroLevel{selector[0]: child}
}
l.lock.Unlock()
return child.findAvroLevelOrCreate(selector[1:])
}
func (l *AvroLevel) addMetric(metricName string, value util.Float, timestamp int64, Freq int) {
l.lock.Lock()
defer l.lock.Unlock()
KeyCounter := int(CheckpointBufferMinutes * 60 / Freq)
// Create keys in advance for the given amount of time
if len(l.data) != KeyCounter {
if len(l.data) == 0 {
for i := range KeyCounter {
l.data[timestamp+int64(i*Freq)] = make(map[string]util.Float, 0)
}
} else {
// Get the last timestamp
var lastTs int64
for ts := range l.data {
if ts > lastTs {
lastTs = ts
}
}
// Create keys for the next KeyCounter timestamps
l.data[lastTs+int64(Freq)] = make(map[string]util.Float, 0)
}
}
closestTs := int64(0)
minDiff := int64(Freq) + 1 // Start with diff just outside the valid range
found := false
// Iterate over timestamps and choose the one which is within range.
// Since its epoch time, we check if the difference is less than 60 seconds.
for ts, dat := range l.data {
// Check if timestamp is within range
diff := timestamp - ts
if diff < -int64(Freq) || diff > int64(Freq) {
continue
}
// Metric already present at this timestamp — skip
if _, ok := dat[metricName]; ok {
continue
}
// Check if this is the closest timestamp so far
if Abs(diff) < minDiff {
minDiff = Abs(diff)
closestTs = ts
found = true
}
}
if found {
l.data[closestTs][metricName] = value
}
}
func GetAvroStore() *AvroStore {
return &avroStore
}
// Abs returns the absolute value of x.
func Abs(x int64) int64 {
if x < 0 {
return -x
}
return x
}