mirror of
https://github.com/ClusterCockpit/cc-metric-store.git
synced 2025-06-18 13:33:49 +02:00
Fix to Avro features
This commit is contained in:
parent
968940da1f
commit
aa0a3673f2
@ -198,8 +198,6 @@ func decodeLine(dec *lineprotocol.Decoder,
|
|||||||
var lvl *memorystore.Level = nil
|
var lvl *memorystore.Level = nil
|
||||||
prevCluster, prevHost := "", ""
|
prevCluster, prevHost := "", ""
|
||||||
|
|
||||||
var AvroBuf avro.AvroStruct
|
|
||||||
|
|
||||||
var ok bool
|
var ok bool
|
||||||
for dec.Next() {
|
for dec.Next() {
|
||||||
rawmeasurement, err := dec.Measurement()
|
rawmeasurement, err := dec.Measurement()
|
||||||
@ -333,14 +331,13 @@ func decodeLine(dec *lineprotocol.Decoder,
|
|||||||
}
|
}
|
||||||
|
|
||||||
if config.Keys.Checkpoints.FileFormat != "json" {
|
if config.Keys.Checkpoints.FileFormat != "json" {
|
||||||
AvroBuf.MetricName = string(metricBuf)
|
avro.LineProtocolMessages <- &avro.AvroStruct{
|
||||||
AvroBuf.Cluster = cluster
|
MetricName: string(metricBuf),
|
||||||
AvroBuf.Node = host
|
Cluster: cluster,
|
||||||
AvroBuf.Selector = selector
|
Node: host,
|
||||||
AvroBuf.Value = metric.Value
|
Selector: append([]string{}, selector...),
|
||||||
AvroBuf.Timestamp = t.Unix()
|
Value: metric.Value,
|
||||||
|
Timestamp: t.Unix()}
|
||||||
avro.LineProtocolMessages <- AvroBuf
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := ms.WriteToLevel(lvl, selector, t.Unix(), []memorystore.Metric{metric}); err != nil {
|
if err := ms.WriteToLevel(lvl, selector, t.Unix(), []memorystore.Metric{metric}); err != nil {
|
||||||
|
@ -137,7 +137,7 @@ func getTimestamp(dir string) int64 {
|
|||||||
}
|
}
|
||||||
|
|
||||||
interval, _ := time.ParseDuration(config.Keys.Checkpoints.Interval)
|
interval, _ := time.ParseDuration(config.Keys.Checkpoints.Interval)
|
||||||
updateTime := time.Now().Add(-interval).Unix()
|
updateTime := time.Now().Add(-interval - time.Duration(CheckpointBufferMinutes) + 1).Unix()
|
||||||
|
|
||||||
if maxTs < updateTime {
|
if maxTs < updateTime {
|
||||||
return 0
|
return 0
|
||||||
|
@ -58,7 +58,7 @@ func DataStaging(wg *sync.WaitGroup, ctx context.Context) {
|
|||||||
if avroLevel == nil {
|
if avroLevel == nil {
|
||||||
fmt.Printf("Error creating or finding the level with cluster : %s, node : %s, metric : %s\n", val.Cluster, val.Node, val.MetricName)
|
fmt.Printf("Error creating or finding the level with cluster : %s, node : %s, metric : %s\n", val.Cluster, val.Node, val.MetricName)
|
||||||
}
|
}
|
||||||
copy(oldSelector, selector)
|
oldSelector = append([]string{}, selector...)
|
||||||
}
|
}
|
||||||
|
|
||||||
avroLevel.addMetric(metricName, val.Value, val.Timestamp, int(freq))
|
avroLevel.addMetric(metricName, val.Value, val.Timestamp, int(freq))
|
||||||
|
@ -6,9 +6,8 @@ import (
|
|||||||
"github.com/ClusterCockpit/cc-metric-store/internal/util"
|
"github.com/ClusterCockpit/cc-metric-store/internal/util"
|
||||||
)
|
)
|
||||||
|
|
||||||
var LineProtocolMessages = make(chan AvroStruct)
|
var LineProtocolMessages = make(chan *AvroStruct)
|
||||||
var Delimiter = "ZZZZZ"
|
var Delimiter = "ZZZZZ"
|
||||||
var AvroCounter = 0
|
|
||||||
|
|
||||||
// CheckpointBufferMinutes should always be in minutes.
|
// CheckpointBufferMinutes should always be in minutes.
|
||||||
// Its controls the amount of data to hold for given amount of time.
|
// Its controls the amount of data to hold for given amount of time.
|
||||||
@ -120,13 +119,13 @@ func (l *AvroLevel) addMetric(metricName string, value util.Float, timestamp int
|
|||||||
|
|
||||||
// Iterate over timestamps and choose the one which is within range.
|
// Iterate over timestamps and choose the one which is within range.
|
||||||
// Since its epoch time, we check if the difference is less than 60 seconds.
|
// Since its epoch time, we check if the difference is less than 60 seconds.
|
||||||
for ts := range l.data {
|
for ts, dat := range l.data {
|
||||||
if _, ok := l.data[ts][metricName]; ok {
|
if _, ok := dat[metricName]; ok {
|
||||||
// If the metric is already present, we can skip it
|
// If the metric is already present, we can skip it
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if (ts - timestamp) < int64(Freq) {
|
if Abs(ts-timestamp) < int64(Freq) {
|
||||||
l.data[ts][metricName] = value
|
dat[metricName] = value
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -135,3 +134,11 @@ func (l *AvroLevel) addMetric(metricName string, value util.Float, timestamp int
|
|||||||
func GetAvroStore() *AvroStore {
|
func GetAvroStore() *AvroStore {
|
||||||
return &avroStore
|
return &avroStore
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Abs returns the absolute value of x.
|
||||||
|
func Abs(x int64) int64 {
|
||||||
|
if x < 0 {
|
||||||
|
return -x
|
||||||
|
}
|
||||||
|
return x
|
||||||
|
}
|
||||||
|
@ -439,6 +439,10 @@ func (l *Level) loadAvroFile(m *MemoryStore, f *os.File, from int64) error {
|
|||||||
br := bufio.NewReader(f)
|
br := bufio.NewReader(f)
|
||||||
|
|
||||||
fileName := f.Name()[strings.LastIndex(f.Name(), "/")+1:]
|
fileName := f.Name()[strings.LastIndex(f.Name(), "/")+1:]
|
||||||
|
from_timestamp, err := strconv.ParseInt(fileName[strings.Index(fileName, "_")+1:len(fileName)-5], 10, 64)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("error converting timestamp from the avro file : %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
resolution, err := strconv.ParseInt(fileName[0:strings.Index(fileName, "_")], 10, 64)
|
resolution, err := strconv.ParseInt(fileName[0:strings.Index(fileName, "_")], 10, 64)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -475,7 +479,7 @@ func (l *Level) loadAvroFile(m *MemoryStore, f *os.File, from int64) error {
|
|||||||
recordCounter += 1
|
recordCounter += 1
|
||||||
}
|
}
|
||||||
|
|
||||||
to := (from + (recordCounter / (60 / resolution) * 60))
|
to := (from_timestamp + (recordCounter / (60 / resolution) * 60))
|
||||||
if to < from {
|
if to < from {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@ -508,12 +512,12 @@ func (l *Level) loadAvroFile(m *MemoryStore, f *os.File, from int64) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
leafMetricName := subString[len(subString)-1]
|
leafMetricName := subString[len(subString)-1]
|
||||||
err = lvl.createBuffer(m, leafMetricName, floatArray, from, resolution)
|
err = lvl.createBuffer(m, leafMetricName, floatArray, from_timestamp, resolution)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("error while creating buffers from avroReader : %s", err)
|
return fmt.Errorf("error while creating buffers from avroReader : %s", err)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
err = l.createBuffer(m, metricName, floatArray, from, resolution)
|
err = l.createBuffer(m, metricName, floatArray, from_timestamp, resolution)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("error while creating buffers from avroReader : %s", err)
|
return fmt.Errorf("error while creating buffers from avroReader : %s", err)
|
||||||
}
|
}
|
||||||
@ -551,6 +555,15 @@ func (l *Level) createBuffer(m *MemoryStore, metricName string, floatArray util.
|
|||||||
|
|
||||||
b.prev = prev
|
b.prev = prev
|
||||||
prev.next = b
|
prev.next = b
|
||||||
|
|
||||||
|
missingCount := ((int(b.start) - int(prev.start)) - len(prev.data)*int(b.frequency))
|
||||||
|
if missingCount > 0 {
|
||||||
|
missingCount /= int(b.frequency)
|
||||||
|
|
||||||
|
for range missingCount {
|
||||||
|
prev.data = append(prev.data, util.NaN)
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
l.metrics[minfo.Offset] = b
|
l.metrics[minfo.Offset] = b
|
||||||
|
|
||||||
|
@ -81,11 +81,13 @@ func Shutdown() {
|
|||||||
var files int
|
var files int
|
||||||
var err error
|
var err error
|
||||||
|
|
||||||
if config.Keys.Checkpoints.FileFormat == "json" {
|
|
||||||
ms := GetMemoryStore()
|
ms := GetMemoryStore()
|
||||||
|
|
||||||
|
if config.Keys.Checkpoints.FileFormat == "json" {
|
||||||
files, err = ms.ToCheckpoint(config.Keys.Checkpoints.RootDir, lastCheckpoint.Unix(), time.Now().Unix())
|
files, err = ms.ToCheckpoint(config.Keys.Checkpoints.RootDir, lastCheckpoint.Unix(), time.Now().Unix())
|
||||||
} else {
|
} else {
|
||||||
files, err = avro.GetAvroStore().ToCheckpoint(config.Keys.Checkpoints.RootDir, true)
|
files, err = avro.GetAvroStore().ToCheckpoint(config.Keys.Checkpoints.RootDir, true)
|
||||||
|
close(avro.LineProtocolMessages)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -93,6 +95,76 @@ func Shutdown() {
|
|||||||
}
|
}
|
||||||
log.Printf("Done! (%d files written)\n", files)
|
log.Printf("Done! (%d files written)\n", files)
|
||||||
|
|
||||||
|
// ms.PrintHeirarchy()
|
||||||
|
}
|
||||||
|
|
||||||
|
// func (m *MemoryStore) PrintHeirarchy() {
|
||||||
|
// m.root.lock.Lock()
|
||||||
|
// defer m.root.lock.Unlock()
|
||||||
|
|
||||||
|
// fmt.Printf("Root : \n")
|
||||||
|
|
||||||
|
// for lvl1, sel1 := range m.root.children {
|
||||||
|
// fmt.Printf("\t%s\n", lvl1)
|
||||||
|
// for lvl2, sel2 := range sel1.children {
|
||||||
|
// fmt.Printf("\t\t%s\n", lvl2)
|
||||||
|
// if lvl1 == "fritz" && lvl2 == "f0201" {
|
||||||
|
|
||||||
|
// for name, met := range m.Metrics {
|
||||||
|
// mt := sel2.metrics[met.Offset]
|
||||||
|
|
||||||
|
// fmt.Printf("\t\t\t\t%s\n", name)
|
||||||
|
// fmt.Printf("\t\t\t\t")
|
||||||
|
|
||||||
|
// for mt != nil {
|
||||||
|
// // if name == "cpu_load" {
|
||||||
|
// fmt.Printf("%d(%d) -> %#v", mt.start, len(mt.data), mt.data)
|
||||||
|
// // }
|
||||||
|
// mt = mt.prev
|
||||||
|
// }
|
||||||
|
// fmt.Printf("\n")
|
||||||
|
|
||||||
|
// }
|
||||||
|
// }
|
||||||
|
// for lvl3, sel3 := range sel2.children {
|
||||||
|
// if lvl1 == "fritz" && lvl2 == "f0201" && lvl3 == "hwthread70" {
|
||||||
|
|
||||||
|
// fmt.Printf("\t\t\t\t\t%s\n", lvl3)
|
||||||
|
|
||||||
|
// for name, met := range m.Metrics {
|
||||||
|
// mt := sel3.metrics[met.Offset]
|
||||||
|
|
||||||
|
// fmt.Printf("\t\t\t\t\t\t%s\n", name)
|
||||||
|
|
||||||
|
// fmt.Printf("\t\t\t\t\t\t")
|
||||||
|
|
||||||
|
// for mt != nil {
|
||||||
|
// // if name == "clock" {
|
||||||
|
// fmt.Printf("%d(%d) -> %#v", mt.start, len(mt.data), mt.data)
|
||||||
|
|
||||||
|
// mt = mt.prev
|
||||||
|
// }
|
||||||
|
// fmt.Printf("\n")
|
||||||
|
|
||||||
|
// }
|
||||||
|
|
||||||
|
// // for i, _ := range sel3.metrics {
|
||||||
|
// // fmt.Printf("\t\t\t\t\t%s\n", getName(configmetrics, i))
|
||||||
|
// // }
|
||||||
|
// }
|
||||||
|
// }
|
||||||
|
// }
|
||||||
|
// }
|
||||||
|
|
||||||
|
// }
|
||||||
|
|
||||||
|
func getName(m *MemoryStore, i int) string {
|
||||||
|
for key, val := range m.Metrics {
|
||||||
|
if val.Offset == i {
|
||||||
|
return key
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return ""
|
||||||
}
|
}
|
||||||
|
|
||||||
func Retention(wg *sync.WaitGroup, ctx context.Context) {
|
func Retention(wg *sync.WaitGroup, ctx context.Context) {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user