From a50b832c2ac2e31387100b9b18cc42b62d859ebe Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Fri, 8 Aug 2025 14:24:52 +0200 Subject: [PATCH] Import metric store packages --- internal/avro/avroCheckpoint.go | 473 +++++++++++++++++ internal/avro/avroHelper.go | 79 +++ internal/avro/avroStruct.go | 163 ++++++ internal/memorystore/archive.go | 190 +++++++ internal/memorystore/buffer.go | 233 +++++++++ internal/memorystore/checkpoint.go | 764 ++++++++++++++++++++++++++++ internal/memorystore/config.go | 26 + internal/memorystore/debug.go | 107 ++++ internal/memorystore/healthcheck.go | 88 ++++ internal/memorystore/level.go | 187 +++++++ internal/memorystore/memorystore.go | 372 ++++++++++++++ internal/memorystore/stats.go | 120 +++++ 12 files changed, 2802 insertions(+) create mode 100644 internal/avro/avroCheckpoint.go create mode 100644 internal/avro/avroHelper.go create mode 100644 internal/avro/avroStruct.go create mode 100644 internal/memorystore/archive.go create mode 100644 internal/memorystore/buffer.go create mode 100644 internal/memorystore/checkpoint.go create mode 100644 internal/memorystore/config.go create mode 100644 internal/memorystore/debug.go create mode 100644 internal/memorystore/healthcheck.go create mode 100644 internal/memorystore/level.go create mode 100644 internal/memorystore/memorystore.go create mode 100644 internal/memorystore/stats.go diff --git a/internal/avro/avroCheckpoint.go b/internal/avro/avroCheckpoint.go new file mode 100644 index 0000000..4a3cf19 --- /dev/null +++ b/internal/avro/avroCheckpoint.go @@ -0,0 +1,473 @@ +// Copyright (C) NHR@FAU, University Erlangen-Nuremberg. +// All rights reserved. This file is part of cc-backend. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. +package avro + +import ( + "bufio" + "encoding/json" + "errors" + "fmt" + "log" + "os" + "path" + "sort" + "strconv" + "strings" + "sync" + "sync/atomic" + "time" + + "github.com/ClusterCockpit/cc-lib/util" + "github.com/linkedin/goavro/v2" +) + +var NumWorkers int = 4 + +var ErrNoNewData error = errors.New("no data in the pool") + +func (as *AvroStore) ToCheckpoint(dir string, dumpAll bool) (int, error) { + levels := make([]*AvroLevel, 0) + selectors := make([][]string, 0) + as.root.lock.RLock() + // Cluster + for sel1, l1 := range as.root.children { + l1.lock.RLock() + // Node + for sel2, l2 := range l1.children { + l2.lock.RLock() + // Frequency + for sel3, l3 := range l2.children { + levels = append(levels, l3) + selectors = append(selectors, []string{sel1, sel2, sel3}) + } + l2.lock.RUnlock() + } + l1.lock.RUnlock() + } + as.root.lock.RUnlock() + + type workItem struct { + level *AvroLevel + dir string + selector []string + } + + n, errs := int32(0), int32(0) + + var wg sync.WaitGroup + wg.Add(NumWorkers) + work := make(chan workItem, NumWorkers*2) + for range NumWorkers { + go func() { + defer wg.Done() + + for workItem := range work { + var from int64 = getTimestamp(workItem.dir) + + if err := workItem.level.toCheckpoint(workItem.dir, from, dumpAll); err != nil { + if err == ErrNoNewData { + continue + } + + log.Printf("error while checkpointing %#v: %s", workItem.selector, err.Error()) + atomic.AddInt32(&errs, 1) + } else { + atomic.AddInt32(&n, 1) + } + } + }() + } + + for i := range len(levels) { + dir := path.Join(dir, path.Join(selectors[i]...)) + work <- workItem{ + level: levels[i], + dir: dir, + selector: selectors[i], + } + } + + close(work) + wg.Wait() + + if errs > 0 { + return int(n), fmt.Errorf("%d errors happend while creating avro checkpoints (%d successes)", errs, n) + } + return int(n), nil +} + +// getTimestamp returns the timestamp from the directory name +func getTimestamp(dir string) int64 { + // Extract the resolution and timestamp from the directory name + // The existing avro file will be in epoch timestamp format + // iterate over all the files in the directory and find the maximum timestamp + // and return it + + resolution := path.Base(dir) + dir = path.Dir(dir) + + files, err := os.ReadDir(dir) + if err != nil { + return 0 + } + var maxTs int64 = 0 + + if len(files) == 0 { + return 0 + } + + for _, file := range files { + if file.IsDir() { + continue + } + name := file.Name() + + if len(name) < 5 || !strings.HasSuffix(name, ".avro") || !strings.HasPrefix(name, resolution+"_") { + continue + } + + ts, err := strconv.ParseInt(name[strings.Index(name, "_")+1:len(name)-5], 10, 64) + if err != nil { + fmt.Printf("error while parsing timestamp: %s\n", err.Error()) + continue + } + + if ts > maxTs { + maxTs = ts + } + } + + interval, _ := time.ParseDuration(Keys.Checkpoints.Interval) + updateTime := time.Unix(maxTs, 0).Add(interval).Add(time.Duration(CheckpointBufferMinutes-1) * time.Minute).Unix() + + if updateTime < time.Now().Unix() { + return 0 + } + + return maxTs +} + +func (l *AvroLevel) toCheckpoint(dir string, from int64, dumpAll bool) error { + l.lock.Lock() + defer l.lock.Unlock() + + // fmt.Printf("Checkpointing directory: %s\n", dir) + // filepath contains the resolution + int_res, _ := strconv.Atoi(path.Base(dir)) + + // find smallest overall timestamp in l.data map and delete it from l.data + var minTs int64 = int64(1<<63 - 1) + for ts, dat := range l.data { + if ts < minTs && len(dat) != 0 { + minTs = ts + } + } + + if from == 0 && minTs != int64(1<<63-1) { + from = minTs + } + + if from == 0 { + return ErrNoNewData + } + + var schema string + var codec *goavro.Codec + record_list := make([]map[string]interface{}, 0) + + var f *os.File + + filePath := dir + fmt.Sprintf("_%d.avro", from) + + var err error + + fp_, err_ := os.Stat(filePath) + if errors.Is(err_, os.ErrNotExist) { + err = os.MkdirAll(path.Dir(dir), 0o755) + if err != nil { + return fmt.Errorf("failed to create directory: %v", err) + } + } else if fp_.Size() != 0 { + f, err = os.Open(filePath) + if err != nil { + return fmt.Errorf("failed to open existing avro file: %v", err) + } + + br := bufio.NewReader(f) + + reader, err := goavro.NewOCFReader(br) + if err != nil { + return fmt.Errorf("failed to create OCF reader: %v", err) + } + codec = reader.Codec() + schema = codec.Schema() + + f.Close() + } + + time_ref := time.Now().Add(time.Duration(-CheckpointBufferMinutes+1) * time.Minute).Unix() + + if dumpAll { + time_ref = time.Now().Unix() + } + + // Empty values + if len(l.data) == 0 { + // we checkpoint avro files every 60 seconds + repeat := 60 / int_res + + for range repeat { + record_list = append(record_list, make(map[string]interface{})) + } + } + + readFlag := true + + for ts := range l.data { + flag := false + if ts < time_ref { + data := l.data[ts] + + schema_gen, err := generateSchema(data) + if err != nil { + return err + } + + flag, schema, err = compareSchema(schema, schema_gen) + if err != nil { + return fmt.Errorf("failed to compare read and generated schema: %v", err) + } + if flag && readFlag && !errors.Is(err_, os.ErrNotExist) { + + f.Close() + + f, err = os.Open(filePath) + if err != nil { + return fmt.Errorf("failed to open Avro file: %v", err) + } + + br := bufio.NewReader(f) + + ocfReader, err := goavro.NewOCFReader(br) + if err != nil { + return fmt.Errorf("failed to create OCF reader while changing schema: %v", err) + } + + for ocfReader.Scan() { + record, err := ocfReader.Read() + if err != nil { + return fmt.Errorf("failed to read record: %v", err) + } + + record_list = append(record_list, record.(map[string]interface{})) + } + + f.Close() + + err = os.Remove(filePath) + if err != nil { + return fmt.Errorf("failed to delete file: %v", err) + } + + readFlag = false + } + codec, err = goavro.NewCodec(schema) + if err != nil { + return fmt.Errorf("failed to create codec after merged schema: %v", err) + } + + record_list = append(record_list, generateRecord(data)) + delete(l.data, ts) + } + } + + if len(record_list) == 0 { + return ErrNoNewData + } + + f, err = os.OpenFile(filePath, os.O_CREATE|os.O_APPEND|os.O_RDWR, 0o644) + if err != nil { + return fmt.Errorf("failed to append new avro file: %v", err) + } + + // fmt.Printf("Codec : %#v\n", codec) + + writer, err := goavro.NewOCFWriter(goavro.OCFConfig{ + W: f, + Codec: codec, + CompressionName: goavro.CompressionDeflateLabel, + }) + if err != nil { + return fmt.Errorf("failed to create OCF writer: %v", err) + } + + // Append the new record + if err := writer.Append(record_list); err != nil { + return fmt.Errorf("failed to append record: %v", err) + } + + f.Close() + + return nil +} + +func compareSchema(schemaRead, schemaGen string) (bool, string, error) { + var genSchema, readSchema AvroSchema + + if schemaRead == "" { + return false, schemaGen, nil + } + + // Unmarshal the schema strings into AvroSchema structs + if err := json.Unmarshal([]byte(schemaGen), &genSchema); err != nil { + return false, "", fmt.Errorf("failed to parse generated schema: %v", err) + } + if err := json.Unmarshal([]byte(schemaRead), &readSchema); err != nil { + return false, "", fmt.Errorf("failed to parse read schema: %v", err) + } + + sort.Slice(genSchema.Fields, func(i, j int) bool { + return genSchema.Fields[i].Name < genSchema.Fields[j].Name + }) + + sort.Slice(readSchema.Fields, func(i, j int) bool { + return readSchema.Fields[i].Name < readSchema.Fields[j].Name + }) + + // Check if schemas are identical + schemasEqual := true + if len(genSchema.Fields) <= len(readSchema.Fields) { + + for i := range genSchema.Fields { + if genSchema.Fields[i].Name != readSchema.Fields[i].Name { + schemasEqual = false + break + } + } + + // If schemas are identical, return the read schema + if schemasEqual { + return false, schemaRead, nil + } + } + + // Create a map to hold unique fields from both schemas + fieldMap := make(map[string]AvroField) + + // Add fields from the read schema + for _, field := range readSchema.Fields { + fieldMap[field.Name] = field + } + + // Add or update fields from the generated schema + for _, field := range genSchema.Fields { + fieldMap[field.Name] = field + } + + // Create a union schema by collecting fields from the map + var mergedFields []AvroField + for _, field := range fieldMap { + mergedFields = append(mergedFields, field) + } + + // Sort fields by name for consistency + sort.Slice(mergedFields, func(i, j int) bool { + return mergedFields[i].Name < mergedFields[j].Name + }) + + // Create the merged schema + mergedSchema := AvroSchema{ + Type: "record", + Name: genSchema.Name, + Fields: mergedFields, + } + + // Check if schemas are identical + schemasEqual = len(mergedSchema.Fields) == len(readSchema.Fields) + if schemasEqual { + for i := range mergedSchema.Fields { + if mergedSchema.Fields[i].Name != readSchema.Fields[i].Name { + schemasEqual = false + break + } + } + + if schemasEqual { + return false, schemaRead, nil + } + } + + // Marshal the merged schema back to JSON + mergedSchemaJson, err := json.Marshal(mergedSchema) + if err != nil { + return false, "", fmt.Errorf("failed to marshal merged schema: %v", err) + } + + return true, string(mergedSchemaJson), nil +} + +func generateSchema(data map[string]util.Float) (string, error) { + // Define the Avro schema structure + schema := map[string]interface{}{ + "type": "record", + "name": "DataRecord", + "fields": []map[string]interface{}{}, + } + + fieldTracker := make(map[string]struct{}) + + for key := range data { + if _, exists := fieldTracker[key]; !exists { + key = correctKey(key) + + field := map[string]interface{}{ + "name": key, + "type": "double", + "default": -1.0, + } + schema["fields"] = append(schema["fields"].([]map[string]interface{}), field) + fieldTracker[key] = struct{}{} + } + } + + schemaString, err := json.Marshal(schema) + if err != nil { + return "", fmt.Errorf("failed to marshal schema: %v", err) + } + + return string(schemaString), nil +} + +func generateRecord(data map[string]util.Float) map[string]interface{} { + record := make(map[string]interface{}) + + // Iterate through each map in data + for key, value := range data { + key = correctKey(key) + + // Set the value in the record + record[key] = value.Double() + } + + return record +} + +func correctKey(key string) string { + // Replace any invalid characters in the key + // For example, replace spaces with underscores + key = strings.ReplaceAll(key, ":", "___") + key = strings.ReplaceAll(key, ".", "__") + + return key +} + +func ReplaceKey(key string) string { + // Replace any invalid characters in the key + // For example, replace spaces with underscores + key = strings.ReplaceAll(key, "___", ":") + key = strings.ReplaceAll(key, "__", ".") + + return key +} diff --git a/internal/avro/avroHelper.go b/internal/avro/avroHelper.go new file mode 100644 index 0000000..ee09759 --- /dev/null +++ b/internal/avro/avroHelper.go @@ -0,0 +1,79 @@ +package avro + +import ( + "context" + "fmt" + "strconv" + "sync" + +) + +func DataStaging(wg *sync.WaitGroup, ctx context.Context) { + + // AvroPool is a pool of Avro writers. + go func() { + if Keys.Checkpoints.FileFormat == "json" { + wg.Done() // Mark this goroutine as done + return // Exit the goroutine + } + + defer wg.Done() + + var avroLevel *AvroLevel + oldSelector := make([]string, 0) + + for { + select { + case <-ctx.Done(): + return + case val := <-LineProtocolMessages: + //Fetch the frequency of the metric from the global configuration + freq, err := Keys.GetMetricFrequency(val.MetricName) + if err != nil { + fmt.Printf("Error fetching metric frequency: %s\n", err) + continue + } + + metricName := "" + + for _, selector_name := range val.Selector { + metricName += selector_name + Delimiter + } + + metricName += val.MetricName + + // Create a new selector for the Avro level + // The selector is a slice of strings that represents the path to the + // Avro level. It is created by appending the cluster, node, and metric + // name to the selector. + var selector []string + selector = append(selector, val.Cluster, val.Node, strconv.FormatInt(freq, 10)) + + if !testEq(oldSelector, selector) { + // Get the Avro level for the metric + avroLevel = avroStore.root.findAvroLevelOrCreate(selector) + + // If the Avro level is nil, create a new one + if avroLevel == nil { + fmt.Printf("Error creating or finding the level with cluster : %s, node : %s, metric : %s\n", val.Cluster, val.Node, val.MetricName) + } + oldSelector = append([]string{}, selector...) + } + + avroLevel.addMetric(metricName, val.Value, val.Timestamp, int(freq)) + } + } + }() +} + +func testEq(a, b []string) bool { + if len(a) != len(b) { + return false + } + for i := range a { + if a[i] != b[i] { + return false + } + } + return true +} diff --git a/internal/avro/avroStruct.go b/internal/avro/avroStruct.go new file mode 100644 index 0000000..27aac47 --- /dev/null +++ b/internal/avro/avroStruct.go @@ -0,0 +1,163 @@ +package avro + +import ( + "sync" + + "github.com/ClusterCockpit/cc-lib/util" +) + +var ( + LineProtocolMessages = make(chan *AvroStruct) + Delimiter = "ZZZZZ" +) + +// CheckpointBufferMinutes should always be in minutes. +// Its controls the amount of data to hold for given amount of time. +var CheckpointBufferMinutes = 3 + +type AvroStruct struct { + MetricName string + Cluster string + Node string + Selector []string + Value util.Float + Timestamp int64 +} + +type AvroStore struct { + root AvroLevel +} + +var avroStore AvroStore + +type AvroLevel struct { + children map[string]*AvroLevel + data map[int64]map[string]util.Float + lock sync.RWMutex +} + +type AvroField struct { + Name string `json:"name"` + Type interface{} `json:"type"` + Default interface{} `json:"default,omitempty"` +} + +type AvroSchema struct { + Type string `json:"type"` + Name string `json:"name"` + Fields []AvroField `json:"fields"` +} + +func (l *AvroLevel) findAvroLevelOrCreate(selector []string) *AvroLevel { + if len(selector) == 0 { + return l + } + + // Allow concurrent reads: + l.lock.RLock() + var child *AvroLevel + var ok bool + if l.children == nil { + // Children map needs to be created... + l.lock.RUnlock() + } else { + child, ok := l.children[selector[0]] + l.lock.RUnlock() + if ok { + return child.findAvroLevelOrCreate(selector[1:]) + } + } + + // The level does not exist, take write lock for unqiue access: + l.lock.Lock() + // While this thread waited for the write lock, another thread + // could have created the child node. + if l.children != nil { + child, ok = l.children[selector[0]] + if ok { + l.lock.Unlock() + return child.findAvroLevelOrCreate(selector[1:]) + } + } + + child = &AvroLevel{ + data: make(map[int64]map[string]util.Float, 0), + children: nil, + } + + if l.children != nil { + l.children[selector[0]] = child + } else { + l.children = map[string]*AvroLevel{selector[0]: child} + } + l.lock.Unlock() + return child.findAvroLevelOrCreate(selector[1:]) +} + +func (l *AvroLevel) addMetric(metricName string, value util.Float, timestamp int64, Freq int) { + l.lock.Lock() + defer l.lock.Unlock() + + KeyCounter := int(CheckpointBufferMinutes * 60 / Freq) + + // Create keys in advance for the given amount of time + if len(l.data) != KeyCounter { + if len(l.data) == 0 { + for i := range KeyCounter { + l.data[timestamp+int64(i*Freq)] = make(map[string]util.Float, 0) + } + } else { + // Get the last timestamp + var lastTs int64 + for ts := range l.data { + if ts > lastTs { + lastTs = ts + } + } + // Create keys for the next KeyCounter timestamps + l.data[lastTs+int64(Freq)] = make(map[string]util.Float, 0) + } + } + + closestTs := int64(0) + minDiff := int64(Freq) + 1 // Start with diff just outside the valid range + found := false + + // Iterate over timestamps and choose the one which is within range. + // Since its epoch time, we check if the difference is less than 60 seconds. + for ts, dat := range l.data { + // Check if timestamp is within range + diff := timestamp - ts + if diff < -int64(Freq) || diff > int64(Freq) { + continue + } + + // Metric already present at this timestamp — skip + if _, ok := dat[metricName]; ok { + continue + } + + // Check if this is the closest timestamp so far + if Abs(diff) < minDiff { + minDiff = Abs(diff) + closestTs = ts + found = true + } + } + + if found { + l.data[closestTs][metricName] = value + } +} + +func GetAvroStore() *AvroStore { + return &avroStore +} + +// Abs returns the absolute value of x. +func Abs(x int64) int64 { + if x < 0 { + return -x + } + return x +} diff --git a/internal/memorystore/archive.go b/internal/memorystore/archive.go new file mode 100644 index 0000000..6e25aff --- /dev/null +++ b/internal/memorystore/archive.go @@ -0,0 +1,190 @@ +// Copyright (C) NHR@FAU, University Erlangen-Nuremberg. +// All rights reserved. This file is part of cc-backend. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. +package memorystore + +import ( + "archive/zip" + "bufio" + "context" + "errors" + "fmt" + "io" + "os" + "path/filepath" + "sync" + "sync/atomic" + "time" + + cclog "github.com/ClusterCockpit/cc-lib/ccLogger" +) + +func Archiving(wg *sync.WaitGroup, ctx context.Context) { + go func() { + defer wg.Done() + d, err := time.ParseDuration(Keys.Archive.Interval) + if err != nil { + cclog.Fatalf("error parsing archive interval duration: %v\n", err) + } + if d <= 0 { + return + } + + ticks := func() <-chan time.Time { + if d <= 0 { + return nil + } + return time.NewTicker(d).C + }() + for { + select { + case <-ctx.Done(): + return + case <-ticks: + t := time.Now().Add(-d) + cclog.Infof("start archiving checkpoints (older than %s)...\n", t.Format(time.RFC3339)) + n, err := ArchiveCheckpoints(Keys.Checkpoints.RootDir, + Keys.Archive.RootDir, t.Unix(), Keys.Archive.DeleteInstead) + + if err != nil { + cclog.Warnf("archiving failed: %s\n", err.Error()) + } else { + cclog.Infof("done: %d files zipped and moved to archive\n", n) + } + } + } + }() +} + +var ErrNoNewData error = errors.New("all data already archived") + +// ZIP all checkpoint files older than `from` together and write them to the `archiveDir`, +// deleting them from the `checkpointsDir`. +func ArchiveCheckpoints(checkpointsDir, archiveDir string, from int64, deleteInstead bool) (int, error) { + entries1, err := os.ReadDir(checkpointsDir) + if err != nil { + return 0, err + } + + type workItem struct { + cdir, adir string + cluster, host string + } + + var wg sync.WaitGroup + n, errs := int32(0), int32(0) + work := make(chan workItem, NumWorkers) + + wg.Add(NumWorkers) + for worker := 0; worker < NumWorkers; worker++ { + go func() { + defer wg.Done() + for workItem := range work { + m, err := archiveCheckpoints(workItem.cdir, workItem.adir, from, deleteInstead) + if err != nil { + cclog.Errorf("error while archiving %s/%s: %s", workItem.cluster, workItem.host, err.Error()) + atomic.AddInt32(&errs, 1) + } + atomic.AddInt32(&n, int32(m)) + } + }() + } + + for _, de1 := range entries1 { + entries2, e := os.ReadDir(filepath.Join(checkpointsDir, de1.Name())) + if e != nil { + err = e + } + + for _, de2 := range entries2 { + cdir := filepath.Join(checkpointsDir, de1.Name(), de2.Name()) + adir := filepath.Join(archiveDir, de1.Name(), de2.Name()) + work <- workItem{ + adir: adir, cdir: cdir, + cluster: de1.Name(), host: de2.Name(), + } + } + } + + close(work) + wg.Wait() + + if err != nil { + return int(n), err + } + + if errs > 0 { + return int(n), fmt.Errorf("%d errors happend while archiving (%d successes)", errs, n) + } + return int(n), nil +} + +// Helper function for `ArchiveCheckpoints`. +func archiveCheckpoints(dir string, archiveDir string, from int64, deleteInstead bool) (int, error) { + entries, err := os.ReadDir(dir) + if err != nil { + return 0, err + } + + extension := Keys.Checkpoints.FileFormat + files, err := findFiles(entries, from, extension, false) + if err != nil { + return 0, err + } + + if deleteInstead { + n := 0 + for _, checkpoint := range files { + filename := filepath.Join(dir, checkpoint) + if err = os.Remove(filename); err != nil { + return n, err + } + n += 1 + } + return n, nil + } + + filename := filepath.Join(archiveDir, fmt.Sprintf("%d.zip", from)) + f, err := os.OpenFile(filename, os.O_CREATE|os.O_WRONLY, 0o644) + if err != nil && os.IsNotExist(err) { + err = os.MkdirAll(archiveDir, 0o755) + if err == nil { + f, err = os.OpenFile(filename, os.O_CREATE|os.O_WRONLY, 0o644) + } + } + if err != nil { + return 0, err + } + defer f.Close() + bw := bufio.NewWriter(f) + defer bw.Flush() + zw := zip.NewWriter(bw) + defer zw.Close() + + n := 0 + for _, checkpoint := range files { + filename := filepath.Join(dir, checkpoint) + r, err := os.Open(filename) + if err != nil { + return n, err + } + defer r.Close() + + w, err := zw.Create(checkpoint) + if err != nil { + return n, err + } + + if _, err = io.Copy(w, r); err != nil { + return n, err + } + + if err = os.Remove(filename); err != nil { + return n, err + } + n += 1 + } + + return n, nil +} diff --git a/internal/memorystore/buffer.go b/internal/memorystore/buffer.go new file mode 100644 index 0000000..d084c6d --- /dev/null +++ b/internal/memorystore/buffer.go @@ -0,0 +1,233 @@ +package memorystore + +import ( + "errors" + "sync" + + "github.com/ClusterCockpit/cc-lib/util" +) + +// Default buffer capacity. +// `buffer.data` will only ever grow up to it's capacity and a new link +// in the buffer chain will be created if needed so that no copying +// of data or reallocation needs to happen on writes. +const ( + BUFFER_CAP int = 512 +) + +// So that we can reuse allocations +var bufferPool sync.Pool = sync.Pool{ + New: func() interface{} { + return &buffer{ + data: make([]util.Float, 0, BUFFER_CAP), + } + }, +} + +var ( + ErrNoData error = errors.New("no data for this metric/level") + ErrDataDoesNotAlign error = errors.New("data from lower granularities does not align") +) + +// Each metric on each level has it's own buffer. +// This is where the actual values go. +// If `cap(data)` is reached, a new buffer is created and +// becomes the new head of a buffer list. +type buffer struct { + prev *buffer + next *buffer + data []util.Float + frequency int64 + start int64 + archived bool + closed bool +} + +func newBuffer(ts, freq int64) *buffer { + b := bufferPool.Get().(*buffer) + b.frequency = freq + b.start = ts - (freq / 2) + b.prev = nil + b.next = nil + b.archived = false + b.closed = false + b.data = b.data[:0] + return b +} + +// If a new buffer was created, the new head is returnd. +// Otherwise, the existing buffer is returnd. +// Normaly, only "newer" data should be written, but if the value would +// end up in the same buffer anyways it is allowed. +func (b *buffer) write(ts int64, value util.Float) (*buffer, error) { + if ts < b.start { + return nil, errors.New("cannot write value to buffer from past") + } + + // idx := int((ts - b.start + (b.frequency / 3)) / b.frequency) + idx := int((ts - b.start) / b.frequency) + if idx >= cap(b.data) { + newbuf := newBuffer(ts, b.frequency) + newbuf.prev = b + b.next = newbuf + b.close() + b = newbuf + idx = 0 + } + + // Overwriting value or writing value from past + if idx < len(b.data) { + b.data[idx] = value + return b, nil + } + + // Fill up unwritten slots with NaN + for i := len(b.data); i < idx; i++ { + b.data = append(b.data, util.NaN) + } + + b.data = append(b.data, value) + return b, nil +} + +func (b *buffer) end() int64 { + return b.firstWrite() + int64(len(b.data))*b.frequency +} + +func (b *buffer) firstWrite() int64 { + return b.start + (b.frequency / 2) +} + +func (b *buffer) close() {} + +/* +func (b *buffer) close() { + if b.closed { + return + } + + b.closed = true + n, sum, min, max := 0, 0., math.MaxFloat64, -math.MaxFloat64 + for _, x := range b.data { + if x.IsNaN() { + continue + } + + n += 1 + f := float64(x) + sum += f + min = math.Min(min, f) + max = math.Max(max, f) + } + + b.statisticts.samples = n + if n > 0 { + b.statisticts.avg = Float(sum / float64(n)) + b.statisticts.min = Float(min) + b.statisticts.max = Float(max) + } else { + b.statisticts.avg = NaN + b.statisticts.min = NaN + b.statisticts.max = NaN + } +} +*/ + +// func interpolate(idx int, data []Float) Float { +// if idx == 0 || idx+1 == len(data) { +// return NaN +// } +// return (data[idx-1] + data[idx+1]) / 2.0 +// } + +// Return all known values from `from` to `to`. Gaps of information are represented as NaN. +// Simple linear interpolation is done between the two neighboring cells if possible. +// If values at the start or end are missing, instead of NaN values, the second and thrid +// return values contain the actual `from`/`to`. +// This function goes back the buffer chain if `from` is older than the currents buffer start. +// The loaded values are added to `data` and `data` is returned, possibly with a shorter length. +// If `data` is not long enough to hold all values, this function will panic! +func (b *buffer) read(from, to int64, data []util.Float) ([]util.Float, int64, int64, error) { + if from < b.firstWrite() { + if b.prev != nil { + return b.prev.read(from, to, data) + } + from = b.firstWrite() + } + + i := 0 + t := from + for ; t < to; t += b.frequency { + idx := int((t - b.start) / b.frequency) + if idx >= cap(b.data) { + if b.next == nil { + break + } + b = b.next + idx = 0 + } + + if idx >= len(b.data) { + if b.next == nil || to <= b.next.start { + break + } + data[i] += util.NaN + } else if t < b.start { + data[i] += util.NaN + // } else if b.data[idx].IsNaN() { + // data[i] += interpolate(idx, b.data) + } else { + data[i] += b.data[idx] + } + i++ + } + + return data[:i], from, t, nil +} + +// Returns true if this buffer needs to be freed. +func (b *buffer) free(t int64) (delme bool, n int) { + if b.prev != nil { + delme, m := b.prev.free(t) + n += m + if delme { + b.prev.next = nil + if cap(b.prev.data) == BUFFER_CAP { + bufferPool.Put(b.prev) + } + b.prev = nil + } + } + + end := b.end() + if end < t { + return true, n + 1 + } + + return false, n +} + +// Call `callback` on every buffer that contains data in the range from `from` to `to`. +func (b *buffer) iterFromTo(from, to int64, callback func(b *buffer) error) error { + if b == nil { + return nil + } + + if err := b.prev.iterFromTo(from, to, callback); err != nil { + return err + } + + if from <= b.end() && b.start <= to { + return callback(b) + } + + return nil +} + +func (b *buffer) count() int64 { + res := int64(len(b.data)) + if b.prev != nil { + res += b.prev.count() + } + return res +} diff --git a/internal/memorystore/checkpoint.go b/internal/memorystore/checkpoint.go new file mode 100644 index 0000000..ecd6fb1 --- /dev/null +++ b/internal/memorystore/checkpoint.go @@ -0,0 +1,764 @@ +package memorystore + +import ( + "bufio" + "context" + "encoding/json" + "errors" + "fmt" + "io/fs" + "log" + "os" + "path" + "path/filepath" + "runtime" + "sort" + "strconv" + "strings" + "sync" + "sync/atomic" + "time" + + "github.com/ClusterCockpit/cc-backend/pkg/avro" + "github.com/ClusterCockpit/cc-lib/util" + "github.com/linkedin/goavro/v2" +) + +// Whenever changed, update MarshalJSON as well! +type CheckpointMetrics struct { + Data []util.Float `json:"data"` + Frequency int64 `json:"frequency"` + Start int64 `json:"start"` +} + +type CheckpointFile struct { + Metrics map[string]*CheckpointMetrics `json:"metrics"` + Children map[string]*CheckpointFile `json:"children"` + From int64 `json:"from"` + To int64 `json:"to"` +} + +var lastCheckpoint time.Time + +func Checkpointing(wg *sync.WaitGroup, ctx context.Context) { + lastCheckpoint = time.Now() + + if Keys.Checkpoints.FileFormat == "json" { + ms := GetMemoryStore() + + go func() { + defer wg.Done() + d, err := time.ParseDuration(Keys.Checkpoints.Interval) + if err != nil { + log.Fatal(err) + } + if d <= 0 { + return + } + + ticks := func() <-chan time.Time { + if d <= 0 { + return nil + } + return time.NewTicker(d).C + }() + for { + select { + case <-ctx.Done(): + return + case <-ticks: + log.Printf("start checkpointing (starting at %s)...\n", lastCheckpoint.Format(time.RFC3339)) + now := time.Now() + n, err := ms.ToCheckpoint(Keys.Checkpoints.RootDir, + lastCheckpoint.Unix(), now.Unix()) + if err != nil { + log.Printf("checkpointing failed: %s\n", err.Error()) + } else { + log.Printf("done: %d checkpoint files created\n", n) + lastCheckpoint = now + } + } + } + }() + } else { + go func() { + defer wg.Done() + d, _ := time.ParseDuration("1m") + + select { + case <-ctx.Done(): + return + case <-time.After(time.Duration(avro.CheckpointBufferMinutes) * time.Minute): + // This is the first tick untill we collect the data for given minutes. + avro.GetAvroStore().ToCheckpoint(Keys.Checkpoints.RootDir, false) + // log.Printf("Checkpointing %d avro files", count) + + } + + ticks := func() <-chan time.Time { + if d <= 0 { + return nil + } + return time.NewTicker(d).C + }() + + for { + select { + case <-ctx.Done(): + return + case <-ticks: + // Regular ticks of 1 minute to write data. + avro.GetAvroStore().ToCheckpoint(Keys.Checkpoints.RootDir, false) + // log.Printf("Checkpointing %d avro files", count) + } + } + }() + } +} + +// As `Float` implements a custom MarshalJSON() function, +// serializing an array of such types has more overhead +// than one would assume (because of extra allocations, interfaces and so on). +func (cm *CheckpointMetrics) MarshalJSON() ([]byte, error) { + buf := make([]byte, 0, 128+len(cm.Data)*8) + buf = append(buf, `{"frequency":`...) + buf = strconv.AppendInt(buf, cm.Frequency, 10) + buf = append(buf, `,"start":`...) + buf = strconv.AppendInt(buf, cm.Start, 10) + buf = append(buf, `,"data":[`...) + for i, x := range cm.Data { + if i != 0 { + buf = append(buf, ',') + } + if x.IsNaN() { + buf = append(buf, `null`...) + } else { + buf = strconv.AppendFloat(buf, float64(x), 'f', 1, 32) + } + } + buf = append(buf, `]}`...) + return buf, nil +} + +// Metrics stored at the lowest 2 levels are not stored away (root and cluster)! +// On a per-host basis a new JSON file is created. I have no idea if this will scale. +// The good thing: Only a host at a time is locked, so this function can run +// in parallel to writes/reads. +func (m *MemoryStore) ToCheckpoint(dir string, from, to int64) (int, error) { + levels := make([]*Level, 0) + selectors := make([][]string, 0) + m.root.lock.RLock() + for sel1, l1 := range m.root.children { + l1.lock.RLock() + for sel2, l2 := range l1.children { + levels = append(levels, l2) + selectors = append(selectors, []string{sel1, sel2}) + } + l1.lock.RUnlock() + } + m.root.lock.RUnlock() + + type workItem struct { + level *Level + dir string + selector []string + } + + n, errs := int32(0), int32(0) + + var wg sync.WaitGroup + wg.Add(NumWorkers) + work := make(chan workItem, NumWorkers*2) + for worker := 0; worker < NumWorkers; worker++ { + go func() { + defer wg.Done() + + for workItem := range work { + if err := workItem.level.toCheckpoint(workItem.dir, from, to, m); err != nil { + if err == ErrNoNewData { + continue + } + + log.Printf("error while checkpointing %#v: %s", workItem.selector, err.Error()) + atomic.AddInt32(&errs, 1) + } else { + atomic.AddInt32(&n, 1) + } + } + }() + } + + for i := 0; i < len(levels); i++ { + dir := path.Join(dir, path.Join(selectors[i]...)) + work <- workItem{ + level: levels[i], + dir: dir, + selector: selectors[i], + } + } + + close(work) + wg.Wait() + + if errs > 0 { + return int(n), fmt.Errorf("%d errors happend while creating checkpoints (%d successes)", errs, n) + } + return int(n), nil +} + +func (l *Level) toCheckpointFile(from, to int64, m *MemoryStore) (*CheckpointFile, error) { + l.lock.RLock() + defer l.lock.RUnlock() + + retval := &CheckpointFile{ + From: from, + To: to, + Metrics: make(map[string]*CheckpointMetrics), + Children: make(map[string]*CheckpointFile), + } + + for metric, minfo := range m.Metrics { + b := l.metrics[minfo.Offset] + if b == nil { + continue + } + + allArchived := true + b.iterFromTo(from, to, func(b *buffer) error { + if !b.archived { + allArchived = false + } + return nil + }) + + if allArchived { + continue + } + + data := make([]util.Float, (to-from)/b.frequency+1) + data, start, end, err := b.read(from, to, data) + if err != nil { + return nil, err + } + + for i := int((end - start) / b.frequency); i < len(data); i++ { + data[i] = util.NaN + } + + retval.Metrics[metric] = &CheckpointMetrics{ + Frequency: b.frequency, + Start: start, + Data: data, + } + } + + for name, child := range l.children { + val, err := child.toCheckpointFile(from, to, m) + if err != nil { + return nil, err + } + + if val != nil { + retval.Children[name] = val + } + } + + if len(retval.Children) == 0 && len(retval.Metrics) == 0 { + return nil, nil + } + + return retval, nil +} + +func (l *Level) toCheckpoint(dir string, from, to int64, m *MemoryStore) error { + cf, err := l.toCheckpointFile(from, to, m) + if err != nil { + return err + } + + if cf == nil { + return ErrNoNewData + } + + filepath := path.Join(dir, fmt.Sprintf("%d.json", from)) + f, err := os.OpenFile(filepath, os.O_CREATE|os.O_WRONLY, 0o644) + if err != nil && os.IsNotExist(err) { + err = os.MkdirAll(dir, 0o755) + if err == nil { + f, err = os.OpenFile(filepath, os.O_CREATE|os.O_WRONLY, 0o644) + } + } + if err != nil { + return err + } + defer f.Close() + + bw := bufio.NewWriter(f) + if err = json.NewEncoder(bw).Encode(cf); err != nil { + return err + } + + return bw.Flush() +} + +func (m *MemoryStore) FromCheckpoint(dir string, from int64, extension string) (int, error) { + var wg sync.WaitGroup + work := make(chan [2]string, NumWorkers) + n, errs := int32(0), int32(0) + + wg.Add(NumWorkers) + for worker := 0; worker < NumWorkers; worker++ { + go func() { + defer wg.Done() + for host := range work { + lvl := m.root.findLevelOrCreate(host[:], len(m.Metrics)) + nn, err := lvl.fromCheckpoint(m, filepath.Join(dir, host[0], host[1]), from, extension) + if err != nil { + log.Fatalf("error while loading checkpoints: %s", err.Error()) + atomic.AddInt32(&errs, 1) + } + atomic.AddInt32(&n, int32(nn)) + } + }() + } + + i := 0 + clustersDir, err := os.ReadDir(dir) + for _, clusterDir := range clustersDir { + if !clusterDir.IsDir() { + err = errors.New("expected only directories at first level of checkpoints/ directory") + goto done + } + + hostsDir, e := os.ReadDir(filepath.Join(dir, clusterDir.Name())) + if e != nil { + err = e + goto done + } + + for _, hostDir := range hostsDir { + if !hostDir.IsDir() { + err = errors.New("expected only directories at second level of checkpoints/ directory") + goto done + } + + i++ + if i%NumWorkers == 0 && i > 100 { + // Forcing garbage collection runs here regulary during the loading of checkpoints + // will decrease the total heap size after loading everything back to memory is done. + // While loading data, the heap will grow fast, so the GC target size will double + // almost always. By forcing GCs here, we can keep it growing more slowly so that + // at the end, less memory is wasted. + runtime.GC() + } + + work <- [2]string{clusterDir.Name(), hostDir.Name()} + } + } +done: + close(work) + wg.Wait() + + if err != nil { + return int(n), err + } + + if errs > 0 { + return int(n), fmt.Errorf("%d errors happend while creating checkpoints (%d successes)", errs, n) + } + return int(n), nil +} + +// Metrics stored at the lowest 2 levels are not loaded (root and cluster)! +// This function can only be called once and before the very first write or read. +// Different host's data is loaded to memory in parallel. +func (m *MemoryStore) FromCheckpointFiles(dir string, from int64) (int, error) { + if _, err := os.Stat(dir); os.IsNotExist(err) { + // The directory does not exist, so create it using os.MkdirAll() + err := os.MkdirAll(dir, 0755) // 0755 sets the permissions for the directory + if err != nil { + log.Fatalf("Error creating directory: %#v\n", err) + } + fmt.Printf("%#v Directory created successfully.\n", dir) + } + + // Config read (replace with your actual config read) + fileFormat := Keys.Checkpoints.FileFormat + if fileFormat == "" { + fileFormat = "avro" + } + + // Map to easily get the fallback format + oppositeFormat := map[string]string{ + "json": "avro", + "avro": "json", + } + + // First, attempt to load the specified format + if found, err := checkFilesWithExtension(dir, fileFormat); err != nil { + return 0, fmt.Errorf("error checking files with extension: %v", err) + } else if found { + log.Printf("Loading %s files because fileformat is %s\n", fileFormat, fileFormat) + return m.FromCheckpoint(dir, from, fileFormat) + } + + // If not found, attempt the opposite format + altFormat := oppositeFormat[fileFormat] + if found, err := checkFilesWithExtension(dir, altFormat); err != nil { + return 0, fmt.Errorf("error checking files with extension: %v", err) + } else if found { + log.Printf("Loading %s files but fileformat is %s\n", altFormat, fileFormat) + return m.FromCheckpoint(dir, from, altFormat) + } + + log.Println("No valid checkpoint files found in the directory.") + return 0, nil +} + +func checkFilesWithExtension(dir string, extension string) (bool, error) { + found := false + + err := filepath.Walk(dir, func(path string, info os.FileInfo, err error) error { + if err != nil { + return fmt.Errorf("error accessing path %s: %v", path, err) + } + if !info.IsDir() && filepath.Ext(info.Name()) == "."+extension { + found = true + return nil + } + return nil + }) + if err != nil { + return false, fmt.Errorf("error walking through directories: %s", err) + } + + return found, nil +} + +func (l *Level) loadAvroFile(m *MemoryStore, f *os.File, from int64) error { + br := bufio.NewReader(f) + + fileName := f.Name()[strings.LastIndex(f.Name(), "/")+1:] + resolution, err := strconv.ParseInt(fileName[0:strings.Index(fileName, "_")], 10, 64) + if err != nil { + return fmt.Errorf("error while reading avro file (resolution parsing) : %s", err) + } + + from_timestamp, err := strconv.ParseInt(fileName[strings.Index(fileName, "_")+1:len(fileName)-5], 10, 64) + + // Same logic according to lineprotocol + from_timestamp -= (resolution / 2) + + if err != nil { + return fmt.Errorf("error converting timestamp from the avro file : %s", err) + } + + // fmt.Printf("File : %s with resolution : %d\n", fileName, resolution) + + var recordCounter int64 = 0 + + // Create a new OCF reader from the buffered reader + ocfReader, err := goavro.NewOCFReader(br) + if err != nil { + panic(err) + } + + metricsData := make(map[string]util.FloatArray) + + for ocfReader.Scan() { + datum, err := ocfReader.Read() + if err != nil { + return fmt.Errorf("error while reading avro file : %s", err) + } + + record, ok := datum.(map[string]interface{}) + if !ok { + panic("failed to assert datum as map[string]interface{}") + } + + for key, value := range record { + metricsData[key] = append(metricsData[key], util.ConvertToFloat(value.(float64))) + } + + recordCounter += 1 + } + + to := (from_timestamp + (recordCounter / (60 / resolution) * 60)) + if to < from { + return nil + } + + for key, floatArray := range metricsData { + metricName := avro.ReplaceKey(key) + + if strings.Contains(metricName, avro.Delimiter) { + subString := strings.Split(metricName, avro.Delimiter) + + lvl := l + + for i := 0; i < len(subString)-1; i++ { + + sel := subString[i] + + if lvl.children == nil { + lvl.children = make(map[string]*Level) + } + + child, ok := lvl.children[sel] + if !ok { + child = &Level{ + metrics: make([]*buffer, len(m.Metrics)), + children: nil, + } + lvl.children[sel] = child + } + lvl = child + } + + leafMetricName := subString[len(subString)-1] + err = lvl.createBuffer(m, leafMetricName, floatArray, from_timestamp, resolution) + if err != nil { + return fmt.Errorf("error while creating buffers from avroReader : %s", err) + } + } else { + err = l.createBuffer(m, metricName, floatArray, from_timestamp, resolution) + if err != nil { + return fmt.Errorf("error while creating buffers from avroReader : %s", err) + } + } + + } + + return nil +} + +func (l *Level) createBuffer(m *MemoryStore, metricName string, floatArray util.FloatArray, from int64, resolution int64) error { + n := len(floatArray) + b := &buffer{ + frequency: resolution, + start: from, + data: floatArray[0:n:n], + prev: nil, + next: nil, + archived: true, + } + b.close() + + minfo, ok := m.Metrics[metricName] + if !ok { + return nil + // return errors.New("Unkown metric: " + name) + } + + prev := l.metrics[minfo.Offset] + if prev == nil { + l.metrics[minfo.Offset] = b + } else { + if prev.start > b.start { + return errors.New("wooops") + } + + b.prev = prev + prev.next = b + + missingCount := ((int(b.start) - int(prev.start)) - len(prev.data)*int(b.frequency)) + if missingCount > 0 { + missingCount /= int(b.frequency) + + for range missingCount { + prev.data = append(prev.data, util.NaN) + } + + prev.data = prev.data[0:len(prev.data):len(prev.data)] + } + } + l.metrics[minfo.Offset] = b + + return nil +} + +func (l *Level) loadJsonFile(m *MemoryStore, f *os.File, from int64) error { + br := bufio.NewReader(f) + cf := &CheckpointFile{} + if err := json.NewDecoder(br).Decode(cf); err != nil { + return err + } + + if cf.To != 0 && cf.To < from { + return nil + } + + if err := l.loadFile(cf, m); err != nil { + return err + } + + return nil +} + +func (l *Level) loadFile(cf *CheckpointFile, m *MemoryStore) error { + for name, metric := range cf.Metrics { + n := len(metric.Data) + b := &buffer{ + frequency: metric.Frequency, + start: metric.Start, + data: metric.Data[0:n:n], // Space is wasted here :( + prev: nil, + next: nil, + archived: true, + } + b.close() + + minfo, ok := m.Metrics[name] + if !ok { + continue + // return errors.New("Unkown metric: " + name) + } + + prev := l.metrics[minfo.Offset] + if prev == nil { + l.metrics[minfo.Offset] = b + } else { + if prev.start > b.start { + return errors.New("wooops") + } + + b.prev = prev + prev.next = b + } + l.metrics[minfo.Offset] = b + } + + if len(cf.Children) > 0 && l.children == nil { + l.children = make(map[string]*Level) + } + + for sel, childCf := range cf.Children { + child, ok := l.children[sel] + if !ok { + child = &Level{ + metrics: make([]*buffer, len(m.Metrics)), + children: nil, + } + l.children[sel] = child + } + + if err := child.loadFile(childCf, m); err != nil { + return err + } + } + + return nil +} + +func (l *Level) fromCheckpoint(m *MemoryStore, dir string, from int64, extension string) (int, error) { + direntries, err := os.ReadDir(dir) + if err != nil { + if os.IsNotExist(err) { + return 0, nil + } + + return 0, err + } + + allFiles := make([]fs.DirEntry, 0) + filesLoaded := 0 + for _, e := range direntries { + if e.IsDir() { + child := &Level{ + metrics: make([]*buffer, len(m.Metrics)), + children: make(map[string]*Level), + } + + files, err := child.fromCheckpoint(m, path.Join(dir, e.Name()), from, extension) + filesLoaded += files + if err != nil { + return filesLoaded, err + } + + l.children[e.Name()] = child + } else if strings.HasSuffix(e.Name(), "."+extension) { + allFiles = append(allFiles, e) + } else { + continue + } + } + + files, err := findFiles(allFiles, from, extension, true) + if err != nil { + return filesLoaded, err + } + + loaders := map[string]func(*MemoryStore, *os.File, int64) error{ + "json": l.loadJsonFile, + "avro": l.loadAvroFile, + } + + loader := loaders[extension] + + for _, filename := range files { + f, err := os.Open(path.Join(dir, filename)) + if err != nil { + return filesLoaded, err + } + defer f.Close() + + if err = loader(m, f, from); err != nil { + return filesLoaded, err + } + + filesLoaded += 1 + } + + return filesLoaded, nil +} + +// This will probably get very slow over time! +// A solution could be some sort of an index file in which all other files +// and the timespan they contain is listed. +func findFiles(direntries []fs.DirEntry, t int64, extension string, findMoreRecentFiles bool) ([]string, error) { + nums := map[string]int64{} + for _, e := range direntries { + if !strings.HasSuffix(e.Name(), "."+extension) { + continue + } + + ts, err := strconv.ParseInt(e.Name()[strings.Index(e.Name(), "_")+1:len(e.Name())-5], 10, 64) + if err != nil { + return nil, err + } + nums[e.Name()] = ts + } + + sort.Slice(direntries, func(i, j int) bool { + a, b := direntries[i], direntries[j] + return nums[a.Name()] < nums[b.Name()] + }) + + filenames := make([]string, 0) + for i := 0; i < len(direntries); i++ { + e := direntries[i] + ts1 := nums[e.Name()] + + if findMoreRecentFiles && t <= ts1 { + filenames = append(filenames, e.Name()) + } + if i == len(direntries)-1 { + continue + } + + enext := direntries[i+1] + ts2 := nums[enext.Name()] + + if findMoreRecentFiles { + if ts1 < t && t < ts2 { + filenames = append(filenames, e.Name()) + } + } else { + if ts2 < t { + filenames = append(filenames, e.Name()) + } + } + } + + return filenames, nil +} diff --git a/internal/memorystore/config.go b/internal/memorystore/config.go new file mode 100644 index 0000000..0d8a8ab --- /dev/null +++ b/internal/memorystore/config.go @@ -0,0 +1,26 @@ +// Copyright (C) NHR@FAU, University Erlangen-Nuremberg. +// All rights reserved. This file is part of cc-backend. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. +package memorystore + +type MetricStoreConfig struct { + Checkpoints struct { + FileFormat string `json:"file-format"` + Interval string `json:"interval"` + RootDir string `json:"directory"` + Restore string `json:"restore"` + } `json:"checkpoints"` + Debug struct { + DumpToFile string `json:"dump-to-file"` + EnableGops bool `json:"gops"` + } `json:"debug"` + RetentionInMemory string `json:"retention-in-memory"` + Archive struct { + Interval string `json:"interval"` + RootDir string `json:"directory"` + DeleteInstead bool `json:"delete-instead"` + } `json:"archive"` +} + +var Keys MetricStoreConfig diff --git a/internal/memorystore/debug.go b/internal/memorystore/debug.go new file mode 100644 index 0000000..2743a45 --- /dev/null +++ b/internal/memorystore/debug.go @@ -0,0 +1,107 @@ +package memorystore + +import ( + "bufio" + "fmt" + "strconv" +) + +func (b *buffer) debugDump(buf []byte) []byte { + if b.prev != nil { + buf = b.prev.debugDump(buf) + } + + start, len, end := b.start, len(b.data), b.start+b.frequency*int64(len(b.data)) + buf = append(buf, `{"start":`...) + buf = strconv.AppendInt(buf, start, 10) + buf = append(buf, `,"len":`...) + buf = strconv.AppendInt(buf, int64(len), 10) + buf = append(buf, `,"end":`...) + buf = strconv.AppendInt(buf, end, 10) + if b.archived { + buf = append(buf, `,"saved":true`...) + } + if b.next != nil { + buf = append(buf, `},`...) + } else { + buf = append(buf, `}`...) + } + return buf +} + +func (l *Level) debugDump(m *MemoryStore, w *bufio.Writer, lvlname string, buf []byte, depth int) ([]byte, error) { + l.lock.RLock() + defer l.lock.RUnlock() + for i := 0; i < depth; i++ { + buf = append(buf, '\t') + } + buf = append(buf, '"') + buf = append(buf, lvlname...) + buf = append(buf, "\":{\n"...) + depth += 1 + objitems := 0 + for name, mc := range m.Metrics { + if b := l.metrics[mc.Offset]; b != nil { + for i := 0; i < depth; i++ { + buf = append(buf, '\t') + } + + buf = append(buf, '"') + buf = append(buf, name...) + buf = append(buf, `":[`...) + buf = b.debugDump(buf) + buf = append(buf, "],\n"...) + objitems++ + } + } + + for name, lvl := range l.children { + _, err := w.Write(buf) + if err != nil { + return nil, err + } + + buf = buf[0:0] + buf, err = lvl.debugDump(m, w, name, buf, depth) + if err != nil { + return nil, err + } + + buf = append(buf, ',', '\n') + objitems++ + } + + // remove final `,`: + if objitems > 0 { + buf = append(buf[0:len(buf)-1], '\n') + } + + depth -= 1 + for i := 0; i < depth; i++ { + buf = append(buf, '\t') + } + buf = append(buf, '}') + return buf, nil +} + +func (m *MemoryStore) DebugDump(w *bufio.Writer, selector []string) error { + lvl := m.root.findLevel(selector) + if lvl == nil { + return fmt.Errorf("not found: %#v", selector) + } + + buf := make([]byte, 0, 2048) + buf = append(buf, "{"...) + + buf, err := lvl.debugDump(m, w, "data", buf, 0) + if err != nil { + return err + } + + buf = append(buf, "}\n"...) + if _, err = w.Write(buf); err != nil { + return err + } + + return w.Flush() +} diff --git a/internal/memorystore/healthcheck.go b/internal/memorystore/healthcheck.go new file mode 100644 index 0000000..cb22d49 --- /dev/null +++ b/internal/memorystore/healthcheck.go @@ -0,0 +1,88 @@ +package memorystore + +import ( + "bufio" + "fmt" + "time" +) + +// This is a threshold that allows a node to be healthy with certain number of data points missing. +// Suppose a node does not receive last 5 data points, then healthCheck endpoint will still say a +// node is healthy. Anything more than 5 missing points in metrics of the node will deem the node unhealthy. +const MaxMissingDataPoints int64 = 5 + +// This is a threshold which allows upto certain number of metrics in a node to be unhealthly. +// Works with MaxMissingDataPoints. Say 5 metrics (including submetrics) do not receive the last +// MaxMissingDataPoints data points, then the node will be deemed healthy. Any more metrics that does +// not receive data for MaxMissingDataPoints data points will deem the node unhealthy. +const MaxUnhealthyMetrics int64 = 5 + +func (b *buffer) healthCheck() int64 { + + // Check if the buffer is empty + if b.data == nil { + return 1 + } + + buffer_end := b.start + b.frequency*int64(len(b.data)) + t := time.Now().Unix() + + // Check if the buffer is too old + if t-buffer_end > MaxMissingDataPoints*b.frequency { + return 1 + } + + return 0 +} + +func (l *Level) healthCheck(m *MemoryStore, count int64) (int64, error) { + l.lock.RLock() + defer l.lock.RUnlock() + + for _, mc := range m.Metrics { + if b := l.metrics[mc.Offset]; b != nil { + count += b.healthCheck() + } + } + + for _, lvl := range l.children { + c, err := lvl.healthCheck(m, 0) + if err != nil { + return 0, err + } + count += c + } + + return count, nil +} + +func (m *MemoryStore) HealthCheck(w *bufio.Writer, selector []string) error { + lvl := m.root.findLevel(selector) + if lvl == nil { + return fmt.Errorf("not found: %#v", selector) + } + + buf := make([]byte, 0, 25) + // buf = append(buf, "{"...) + + var count int64 = 0 + + unhealthyMetricsCount, err := lvl.healthCheck(m, count) + if err != nil { + return err + } + + if unhealthyMetricsCount < MaxUnhealthyMetrics { + buf = append(buf, "Healthy"...) + } else { + buf = append(buf, "Unhealthy"...) + } + + // buf = append(buf, "}\n"...) + + if _, err = w.Write(buf); err != nil { + return err + } + + return w.Flush() +} diff --git a/internal/memorystore/level.go b/internal/memorystore/level.go new file mode 100644 index 0000000..76916e6 --- /dev/null +++ b/internal/memorystore/level.go @@ -0,0 +1,187 @@ +package memorystore + +import ( + "sync" + "unsafe" + + "github.com/ClusterCockpit/cc-lib/util" +) + +// Could also be called "node" as this forms a node in a tree structure. +// Called Level because "node" might be confusing here. +// Can be both a leaf or a inner node. In this tree structue, inner nodes can +// also hold data (in `metrics`). +type Level struct { + children map[string]*Level + metrics []*buffer + lock sync.RWMutex +} + +// Find the correct level for the given selector, creating it if +// it does not exist. Example selector in the context of the +// ClusterCockpit could be: []string{ "emmy", "host123", "cpu0" }. +// This function would probably benefit a lot from `level.children` beeing a `sync.Map`? +func (l *Level) findLevelOrCreate(selector []string, nMetrics int) *Level { + if len(selector) == 0 { + return l + } + + // Allow concurrent reads: + l.lock.RLock() + var child *Level + var ok bool + if l.children == nil { + // Children map needs to be created... + l.lock.RUnlock() + } else { + child, ok := l.children[selector[0]] + l.lock.RUnlock() + if ok { + return child.findLevelOrCreate(selector[1:], nMetrics) + } + } + + // The level does not exist, take write lock for unqiue access: + l.lock.Lock() + // While this thread waited for the write lock, another thread + // could have created the child node. + if l.children != nil { + child, ok = l.children[selector[0]] + if ok { + l.lock.Unlock() + return child.findLevelOrCreate(selector[1:], nMetrics) + } + } + + child = &Level{ + metrics: make([]*buffer, nMetrics), + children: nil, + } + + if l.children != nil { + l.children[selector[0]] = child + } else { + l.children = map[string]*Level{selector[0]: child} + } + l.lock.Unlock() + return child.findLevelOrCreate(selector[1:], nMetrics) +} + +func (l *Level) free(t int64) (int, error) { + l.lock.Lock() + defer l.lock.Unlock() + + n := 0 + for i, b := range l.metrics { + if b != nil { + delme, m := b.free(t) + n += m + if delme { + if cap(b.data) == BUFFER_CAP { + bufferPool.Put(b) + } + l.metrics[i] = nil + } + } + } + + for _, l := range l.children { + m, err := l.free(t) + n += m + if err != nil { + return n, err + } + } + + return n, nil +} + +func (l *Level) sizeInBytes() int64 { + l.lock.RLock() + defer l.lock.RUnlock() + size := int64(0) + + for _, b := range l.metrics { + if b != nil { + size += b.count() * int64(unsafe.Sizeof(util.Float(0))) + } + } + + for _, child := range l.children { + size += child.sizeInBytes() + } + + return size +} + +func (l *Level) findLevel(selector []string) *Level { + if len(selector) == 0 { + return l + } + + l.lock.RLock() + defer l.lock.RUnlock() + + lvl := l.children[selector[0]] + if lvl == nil { + return nil + } + + return lvl.findLevel(selector[1:]) +} + +func (l *Level) findBuffers(selector util.Selector, offset int, f func(b *buffer) error) error { + l.lock.RLock() + defer l.lock.RUnlock() + + if len(selector) == 0 { + b := l.metrics[offset] + if b != nil { + return f(b) + } + + for _, lvl := range l.children { + err := lvl.findBuffers(nil, offset, f) + if err != nil { + return err + } + } + return nil + } + + sel := selector[0] + if len(sel.String) != 0 && l.children != nil { + lvl, ok := l.children[sel.String] + if ok { + err := lvl.findBuffers(selector[1:], offset, f) + if err != nil { + return err + } + } + return nil + } + + if sel.Group != nil && l.children != nil { + for _, key := range sel.Group { + lvl, ok := l.children[key] + if ok { + err := lvl.findBuffers(selector[1:], offset, f) + if err != nil { + return err + } + } + } + return nil + } + + if sel.Any && l.children != nil { + for _, lvl := range l.children { + if err := lvl.findBuffers(selector[1:], offset, f); err != nil { + return err + } + } + return nil + } + + return nil +} diff --git a/internal/memorystore/memorystore.go b/internal/memorystore/memorystore.go new file mode 100644 index 0000000..7659a89 --- /dev/null +++ b/internal/memorystore/memorystore.go @@ -0,0 +1,372 @@ +package memorystore + +import ( + "context" + "errors" + "log" + "runtime" + "sync" + "time" + + "github.com/ClusterCockpit/cc-backend/pkg/avro" + "github.com/ClusterCockpit/cc-lib/resampler" + "github.com/ClusterCockpit/cc-lib/util" + "github.com/ClusterCockpit/cc-metric-store/internal/config" +) + +var ( + singleton sync.Once + msInstance *MemoryStore +) + +var NumWorkers int = 4 + +func init() { + maxWorkers := 10 + NumWorkers = runtime.NumCPU()/2 + 1 + if NumWorkers > maxWorkers { + NumWorkers = maxWorkers + } +} + +type Metric struct { + Name string + Value util.Float + MetricConfig config.MetricConfig +} + +type MemoryStore struct { + Metrics map[string]config.MetricConfig + root Level +} + +// Create a new, initialized instance of a MemoryStore. +// Will panic if values in the metric configurations are invalid. +func Init(metrics map[string]config.MetricConfig) { + singleton.Do(func() { + offset := 0 + for key, cfg := range metrics { + if cfg.Frequency == 0 { + panic("invalid frequency") + } + + metrics[key] = config.MetricConfig{ + Frequency: cfg.Frequency, + Aggregation: cfg.Aggregation, + Offset: offset, + } + offset += 1 + } + + msInstance = &MemoryStore{ + root: Level{ + metrics: make([]*buffer, len(metrics)), + children: make(map[string]*Level), + }, + Metrics: metrics, + } + }) +} + +func GetMemoryStore() *MemoryStore { + if msInstance == nil { + log.Fatalf("MemoryStore not initialized!") + } + + return msInstance +} + +func Shutdown() { + log.Printf("Writing to '%s'...\n", config.Keys.Checkpoints.RootDir) + var files int + var err error + + ms := GetMemoryStore() + + if config.Keys.Checkpoints.FileFormat == "json" { + files, err = ms.ToCheckpoint(config.Keys.Checkpoints.RootDir, lastCheckpoint.Unix(), time.Now().Unix()) + } else { + files, err = avro.GetAvroStore().ToCheckpoint(config.Keys.Checkpoints.RootDir, true) + close(avro.LineProtocolMessages) + } + + if err != nil { + log.Printf("Writing checkpoint failed: %s\n", err.Error()) + } + log.Printf("Done! (%d files written)\n", files) + + // ms.PrintHeirarchy() +} + +// func (m *MemoryStore) PrintHeirarchy() { +// m.root.lock.Lock() +// defer m.root.lock.Unlock() + +// fmt.Printf("Root : \n") + +// for lvl1, sel1 := range m.root.children { +// fmt.Printf("\t%s\n", lvl1) +// for lvl2, sel2 := range sel1.children { +// fmt.Printf("\t\t%s\n", lvl2) +// if lvl1 == "fritz" && lvl2 == "f0201" { + +// for name, met := range m.Metrics { +// mt := sel2.metrics[met.Offset] + +// fmt.Printf("\t\t\t\t%s\n", name) +// fmt.Printf("\t\t\t\t") + +// for mt != nil { +// // if name == "cpu_load" { +// fmt.Printf("%d(%d) -> %#v", mt.start, len(mt.data), mt.data) +// // } +// mt = mt.prev +// } +// fmt.Printf("\n") + +// } +// } +// for lvl3, sel3 := range sel2.children { +// if lvl1 == "fritz" && lvl2 == "f0201" && lvl3 == "hwthread70" { + +// fmt.Printf("\t\t\t\t\t%s\n", lvl3) + +// for name, met := range m.Metrics { +// mt := sel3.metrics[met.Offset] + +// fmt.Printf("\t\t\t\t\t\t%s\n", name) + +// fmt.Printf("\t\t\t\t\t\t") + +// for mt != nil { +// // if name == "clock" { +// fmt.Printf("%d(%d) -> %#v", mt.start, len(mt.data), mt.data) + +// mt = mt.prev +// } +// fmt.Printf("\n") + +// } + +// // for i, _ := range sel3.metrics { +// // fmt.Printf("\t\t\t\t\t%s\n", getName(configmetrics, i)) +// // } +// } +// } +// } +// } + +// } + +func getName(m *MemoryStore, i int) string { + for key, val := range m.Metrics { + if val.Offset == i { + return key + } + } + return "" +} + +func Retention(wg *sync.WaitGroup, ctx context.Context) { + ms := GetMemoryStore() + + go func() { + defer wg.Done() + d, err := time.ParseDuration(config.Keys.RetentionInMemory) + if err != nil { + log.Fatal(err) + } + if d <= 0 { + return + } + + ticks := func() <-chan time.Time { + d := d / 2 + if d <= 0 { + return nil + } + return time.NewTicker(d).C + }() + for { + select { + case <-ctx.Done(): + return + case <-ticks: + t := time.Now().Add(-d) + log.Printf("start freeing buffers (older than %s)...\n", t.Format(time.RFC3339)) + freed, err := ms.Free(nil, t.Unix()) + if err != nil { + log.Printf("freeing up buffers failed: %s\n", err.Error()) + } else { + log.Printf("done: %d buffers freed\n", freed) + } + } + } + }() +} + +// Write all values in `metrics` to the level specified by `selector` for time `ts`. +// Look at `findLevelOrCreate` for how selectors work. +func (m *MemoryStore) Write(selector []string, ts int64, metrics []Metric) error { + var ok bool + for i, metric := range metrics { + if metric.MetricConfig.Frequency == 0 { + metric.MetricConfig, ok = m.Metrics[metric.Name] + if !ok { + metric.MetricConfig.Frequency = 0 + } + metrics[i] = metric + } + } + + return m.WriteToLevel(&m.root, selector, ts, metrics) +} + +func (m *MemoryStore) GetLevel(selector []string) *Level { + return m.root.findLevelOrCreate(selector, len(m.Metrics)) +} + +// Assumes that `minfo` in `metrics` is filled in! +func (m *MemoryStore) WriteToLevel(l *Level, selector []string, ts int64, metrics []Metric) error { + l = l.findLevelOrCreate(selector, len(m.Metrics)) + l.lock.Lock() + defer l.lock.Unlock() + + for _, metric := range metrics { + if metric.MetricConfig.Frequency == 0 { + continue + } + + b := l.metrics[metric.MetricConfig.Offset] + if b == nil { + // First write to this metric and level + b = newBuffer(ts, metric.MetricConfig.Frequency) + l.metrics[metric.MetricConfig.Offset] = b + } + + nb, err := b.write(ts, metric.Value) + if err != nil { + return err + } + + // Last write created a new buffer... + if b != nb { + l.metrics[metric.MetricConfig.Offset] = nb + } + } + return nil +} + +// Returns all values for metric `metric` from `from` to `to` for the selected level(s). +// If the level does not hold the metric itself, the data will be aggregated recursively from the children. +// The second and third return value are the actual from/to for the data. Those can be different from +// the range asked for if no data was available. +func (m *MemoryStore) Read(selector util.Selector, metric string, from, to, resolution int64) ([]util.Float, int64, int64, int64, error) { + if from > to { + return nil, 0, 0, 0, errors.New("invalid time range") + } + + minfo, ok := m.Metrics[metric] + if !ok { + return nil, 0, 0, 0, errors.New("unkown metric: " + metric) + } + + n, data := 0, make([]util.Float, (to-from)/minfo.Frequency+1) + + err := m.root.findBuffers(selector, minfo.Offset, func(b *buffer) error { + cdata, cfrom, cto, err := b.read(from, to, data) + if err != nil { + return err + } + + if n == 0 { + from, to = cfrom, cto + } else if from != cfrom || to != cto || len(data) != len(cdata) { + missingfront, missingback := int((from-cfrom)/minfo.Frequency), int((to-cto)/minfo.Frequency) + if missingfront != 0 { + return ErrDataDoesNotAlign + } + + newlen := len(cdata) - missingback + if newlen < 1 { + return ErrDataDoesNotAlign + } + cdata = cdata[0:newlen] + if len(cdata) != len(data) { + return ErrDataDoesNotAlign + } + + from, to = cfrom, cto + } + + data = cdata + n += 1 + return nil + }) + + if err != nil { + return nil, 0, 0, 0, err + } else if n == 0 { + return nil, 0, 0, 0, errors.New("metric or host not found") + } else if n > 1 { + if minfo.Aggregation == config.AvgAggregation { + normalize := 1. / util.Float(n) + for i := 0; i < len(data); i++ { + data[i] *= normalize + } + } else if minfo.Aggregation != config.SumAggregation { + return nil, 0, 0, 0, errors.New("invalid aggregation") + } + } + + data, resolution, err = resampler.LargestTriangleThreeBucket(data, minfo.Frequency, resolution) + if err != nil { + return nil, 0, 0, 0, err + } + + return data, from, to, resolution, nil +} + +// Release all buffers for the selected level and all its children that contain only +// values older than `t`. +func (m *MemoryStore) Free(selector []string, t int64) (int, error) { + return m.GetLevel(selector).free(t) +} + +func (m *MemoryStore) FreeAll() error { + for k := range m.root.children { + delete(m.root.children, k) + } + + return nil +} + +func (m *MemoryStore) SizeInBytes() int64 { + return m.root.sizeInBytes() +} + +// Given a selector, return a list of all children of the level selected. +func (m *MemoryStore) ListChildren(selector []string) []string { + lvl := &m.root + for lvl != nil && len(selector) != 0 { + lvl.lock.RLock() + next := lvl.children[selector[0]] + lvl.lock.RUnlock() + lvl = next + selector = selector[1:] + } + + if lvl == nil { + return nil + } + + lvl.lock.RLock() + defer lvl.lock.RUnlock() + + children := make([]string, 0, len(lvl.children)) + for child := range lvl.children { + children = append(children, child) + } + + return children +} diff --git a/internal/memorystore/stats.go b/internal/memorystore/stats.go new file mode 100644 index 0000000..6682d62 --- /dev/null +++ b/internal/memorystore/stats.go @@ -0,0 +1,120 @@ +package memorystore + +import ( + "errors" + "math" + + "github.com/ClusterCockpit/cc-lib/util" + "github.com/ClusterCockpit/cc-metric-store/internal/config" +) + +type Stats struct { + Samples int + Avg util.Float + Min util.Float + Max util.Float +} + +func (b *buffer) stats(from, to int64) (Stats, int64, int64, error) { + if from < b.start { + if b.prev != nil { + return b.prev.stats(from, to) + } + from = b.start + } + + // TODO: Check if b.closed and if so and the full buffer is queried, + // use b.statistics instead of iterating over the buffer. + + samples := 0 + sum, min, max := 0.0, math.MaxFloat32, -math.MaxFloat32 + + var t int64 + for t = from; t < to; t += b.frequency { + idx := int((t - b.start) / b.frequency) + if idx >= cap(b.data) { + b = b.next + if b == nil { + break + } + idx = 0 + } + + if t < b.start || idx >= len(b.data) { + continue + } + + xf := float64(b.data[idx]) + if math.IsNaN(xf) { + continue + } + + samples += 1 + sum += xf + min = math.Min(min, xf) + max = math.Max(max, xf) + } + + return Stats{ + Samples: samples, + Avg: util.Float(sum) / util.Float(samples), + Min: util.Float(min), + Max: util.Float(max), + }, from, t, nil +} + +// Returns statistics for the requested metric on the selected node/level. +// Data is aggregated to the selected level the same way as in `MemoryStore.Read`. +// If `Stats.Samples` is zero, the statistics should not be considered as valid. +func (m *MemoryStore) Stats(selector util.Selector, metric string, from, to int64) (*Stats, int64, int64, error) { + if from > to { + return nil, 0, 0, errors.New("invalid time range") + } + + minfo, ok := m.Metrics[metric] + if !ok { + return nil, 0, 0, errors.New("unkown metric: " + metric) + } + + n, samples := 0, 0 + avg, min, max := util.Float(0), math.MaxFloat32, -math.MaxFloat32 + err := m.root.findBuffers(selector, minfo.Offset, func(b *buffer) error { + stats, cfrom, cto, err := b.stats(from, to) + if err != nil { + return err + } + + if n == 0 { + from, to = cfrom, cto + } else if from != cfrom || to != cto { + return ErrDataDoesNotAlign + } + + samples += stats.Samples + avg += stats.Avg + min = math.Min(min, float64(stats.Min)) + max = math.Max(max, float64(stats.Max)) + n += 1 + return nil + }) + if err != nil { + return nil, 0, 0, err + } + + if n == 0 { + return nil, 0, 0, ErrNoData + } + + if minfo.Aggregation == config.AvgAggregation { + avg /= util.Float(n) + } else if n > 1 && minfo.Aggregation != config.SumAggregation { + return nil, 0, 0, errors.New("invalid aggregation") + } + + return &Stats{ + Samples: samples, + Avg: avg, + Min: util.Float(min), + Max: util.Float(max), + }, from, to, nil +}