Reintroduce JSON support for compability/transition

This commit is contained in:
Lou Knauer 2022-07-26 09:10:48 +02:00
parent ac5f113ccb
commit 4d35a1ed4e

View File

@ -1,4 +1,4 @@
package main package memstore
import ( import (
"archive/zip" "archive/zip"
@ -18,19 +18,21 @@ import (
"strings" "strings"
"sync" "sync"
"sync/atomic" "sync/atomic"
"github.com/ClusterCockpit/cc-metric-store/internal/types"
) )
// Whenever changed, update MarshalJSON as well! // Whenever changed, update MarshalJSON as well!
type CheckpointMetrics struct { type JSONCheckpointMetrics struct {
Frequency int64 `json:"frequency"` Frequency int64 `json:"frequency"`
Start int64 `json:"start"` Start int64 `json:"start"`
Data []Float `json:"data"` Data []types.Float `json:"data"`
} }
// As `Float` implements a custom MarshalJSON() function, // As `Float` implements a custom MarshalJSON() function,
// serializing an array of such types has more overhead // serializing an array of such types has more overhead
// than one would assume (because of extra allocations, interfaces and so on). // than one would assume (because of extra allocations, interfaces and so on).
func (cm *CheckpointMetrics) MarshalJSON() ([]byte, error) { func (cm *JSONCheckpointMetrics) MarshalJSON() ([]byte, error) {
buf := make([]byte, 0, 128+len(cm.Data)*8) buf := make([]byte, 0, 128+len(cm.Data)*8)
buf = append(buf, `{"frequency":`...) buf = append(buf, `{"frequency":`...)
buf = strconv.AppendInt(buf, cm.Frequency, 10) buf = strconv.AppendInt(buf, cm.Frequency, 10)
@ -51,11 +53,11 @@ func (cm *CheckpointMetrics) MarshalJSON() ([]byte, error) {
return buf, nil return buf, nil
} }
type CheckpointFile struct { type JSONCheckpointFile struct {
From int64 `json:"from"` From int64 `json:"from"`
To int64 `json:"to"` To int64 `json:"to"`
Metrics map[string]*CheckpointMetrics `json:"metrics"` Metrics map[string]*JSONCheckpointMetrics `json:"metrics"`
Children map[string]*CheckpointFile `json:"children"` Children map[string]*JSONCheckpointFile `json:"children"`
} }
var ErrNoNewData error = errors.New("all data already archived") var ErrNoNewData error = errors.New("all data already archived")
@ -74,13 +76,13 @@ func init() {
// On a per-host basis a new JSON file is created. I have no idea if this will scale. // On a per-host basis a new JSON file is created. I have no idea if this will scale.
// The good thing: Only a host at a time is locked, so this function can run // The good thing: Only a host at a time is locked, so this function can run
// in parallel to writes/reads. // in parallel to writes/reads.
func (m *MemoryStore) ToCheckpoint(dir string, from, to int64) (int, error) { func (m *MemoryStore) ToJSONCheckpoint(dir string, from, to int64) (int, error) {
levels := make([]*level, 0) levels := make([]*Level, 0)
selectors := make([][]string, 0) selectors := make([][]string, 0)
m.root.lock.RLock() m.root.lock.RLock()
for sel1, l1 := range m.root.children { for sel1, l1 := range m.root.sublevels {
l1.lock.RLock() l1.lock.RLock()
for sel2, l2 := range l1.children { for sel2, l2 := range l1.sublevels {
levels = append(levels, l2) levels = append(levels, l2)
selectors = append(selectors, []string{sel1, sel2}) selectors = append(selectors, []string{sel1, sel2})
} }
@ -89,7 +91,7 @@ func (m *MemoryStore) ToCheckpoint(dir string, from, to int64) (int, error) {
m.root.lock.RUnlock() m.root.lock.RUnlock()
type workItem struct { type workItem struct {
level *level level *Level
dir string dir string
selector []string selector []string
} }
@ -104,7 +106,7 @@ func (m *MemoryStore) ToCheckpoint(dir string, from, to int64) (int, error) {
defer wg.Done() defer wg.Done()
for workItem := range work { for workItem := range work {
if err := workItem.level.toCheckpoint(workItem.dir, from, to, m); err != nil { if err := workItem.level.toJSONCheckpoint(workItem.dir, from, to, m); err != nil {
if err == ErrNoNewData { if err == ErrNoNewData {
continue continue
} }
@ -136,54 +138,42 @@ func (m *MemoryStore) ToCheckpoint(dir string, from, to int64) (int, error) {
return int(n), nil return int(n), nil
} }
func (l *level) toCheckpointFile(from, to int64, m *MemoryStore) (*CheckpointFile, error) { func (l *Level) toJSONCheckpointFile(from, to int64, m *MemoryStore) (*JSONCheckpointFile, error) {
l.lock.RLock() l.lock.RLock()
defer l.lock.RUnlock() defer l.lock.RUnlock()
retval := &CheckpointFile{ retval := &JSONCheckpointFile{
From: from, From: from,
To: to, To: to,
Metrics: make(map[string]*CheckpointMetrics), Metrics: make(map[string]*JSONCheckpointMetrics),
Children: make(map[string]*CheckpointFile), Children: make(map[string]*JSONCheckpointFile),
} }
for metric, minfo := range m.metrics { for metric, minfo := range m.metrics {
b := l.metrics[minfo.offset] b := l.metrics[minfo.Offset]
if b == nil { if b == nil {
continue continue
} }
allArchived := true data := make([]types.Float, (to-from)/b.frequency+1)
b.iterFromTo(from, to, func(b *buffer) error {
if !b.archived {
allArchived = false
}
return nil
})
if allArchived {
continue
}
data := make([]Float, (to-from)/b.frequency+1)
data, start, end, err := b.read(from, to, data) data, start, end, err := b.read(from, to, data)
if err != nil { if err != nil {
return nil, err return nil, err
} }
for i := int((end - start) / b.frequency); i < len(data); i++ { for i := int((end - start) / b.frequency); i < len(data); i++ {
data[i] = NaN data[i] = types.NaN
} }
retval.Metrics[metric] = &CheckpointMetrics{ retval.Metrics[metric] = &JSONCheckpointMetrics{
Frequency: b.frequency, Frequency: b.frequency,
Start: start, Start: start,
Data: data, Data: data,
} }
} }
for name, child := range l.children { for name, child := range l.sublevels {
val, err := child.toCheckpointFile(from, to, m) val, err := child.toJSONCheckpointFile(from, to, m)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -200,8 +190,8 @@ func (l *level) toCheckpointFile(from, to int64, m *MemoryStore) (*CheckpointFil
return retval, nil return retval, nil
} }
func (l *level) toCheckpoint(dir string, from, to int64, m *MemoryStore) error { func (l *Level) toJSONCheckpoint(dir string, from, to int64, m *MemoryStore) error {
cf, err := l.toCheckpointFile(from, to, m) cf, err := l.toJSONCheckpointFile(from, to, m)
if err != nil { if err != nil {
return err return err
} }
@ -234,7 +224,7 @@ func (l *level) toCheckpoint(dir string, from, to int64, m *MemoryStore) error {
// Metrics stored at the lowest 2 levels are not loaded (root and cluster)! // Metrics stored at the lowest 2 levels are not loaded (root and cluster)!
// This function can only be called once and before the very first write or read. // This function can only be called once and before the very first write or read.
// Different host's data is loaded to memory in parallel. // Different host's data is loaded to memory in parallel.
func (m *MemoryStore) FromCheckpoint(dir string, from int64) (int, error) { func (m *MemoryStore) FromJSONCheckpoint(dir string, from int64) (int, error) {
var wg sync.WaitGroup var wg sync.WaitGroup
work := make(chan [2]string, NumWorkers) work := make(chan [2]string, NumWorkers)
n, errs := int32(0), int32(0) n, errs := int32(0), int32(0)
@ -245,7 +235,7 @@ func (m *MemoryStore) FromCheckpoint(dir string, from int64) (int, error) {
defer wg.Done() defer wg.Done()
for host := range work { for host := range work {
lvl := m.root.findLevelOrCreate(host[:], len(m.metrics)) lvl := m.root.findLevelOrCreate(host[:], len(m.metrics))
nn, err := lvl.fromCheckpoint(filepath.Join(dir, host[0], host[1]), from, m) nn, err := lvl.fromJSONCheckpoint(filepath.Join(dir, host[0], host[1]), from, m)
if err != nil { if err != nil {
log.Fatalf("error while loading checkpoints: %s", err.Error()) log.Fatalf("error while loading checkpoints: %s", err.Error())
atomic.AddInt32(&errs, 1) atomic.AddInt32(&errs, 1)
@ -302,54 +292,53 @@ done:
return int(n), nil return int(n), nil
} }
func (l *level) loadFile(cf *CheckpointFile, m *MemoryStore) error { func (l *Level) loadJSONCheckpointFile(cf *JSONCheckpointFile, m *MemoryStore) error {
for name, metric := range cf.Metrics { for name, metric := range cf.Metrics {
n := len(metric.Data) n := len(metric.Data)
b := &buffer{ c := &chunk{
frequency: metric.Frequency, frequency: metric.Frequency,
start: metric.Start, start: metric.Start,
data: metric.Data[0:n:n], // Space is wasted here :( data: metric.Data[0:n:n], // Space is wasted here :(
prev: nil, prev: nil,
next: nil, next: nil,
archived: true, checkpointed: true,
} }
b.close()
minfo, ok := m.metrics[name] mc, ok := m.metrics[name]
if !ok { if !ok {
continue continue
// return errors.New("Unkown metric: " + name) // return errors.New("Unkown metric: " + name)
} }
prev := l.metrics[minfo.offset] prev := l.metrics[mc.Offset]
if prev == nil { if prev == nil {
l.metrics[minfo.offset] = b l.metrics[mc.Offset] = c
} else { } else {
if prev.start > b.start { if prev.start > c.start {
return errors.New("wooops") return errors.New("wooops")
} }
b.prev = prev c.prev = prev
prev.next = b prev.next = c
} }
l.metrics[minfo.offset] = b l.metrics[mc.Offset] = c
} }
if len(cf.Children) > 0 && l.children == nil { if len(cf.Children) > 0 && l.sublevels == nil {
l.children = make(map[string]*level) l.sublevels = make(map[string]*Level)
} }
for sel, childCf := range cf.Children { for sel, childCf := range cf.Children {
child, ok := l.children[sel] child, ok := l.sublevels[sel]
if !ok { if !ok {
child = &level{ child = &Level{
metrics: make([]*buffer, len(m.metrics)), metrics: make([]*chunk, len(m.metrics)),
children: nil, sublevels: nil,
} }
l.children[sel] = child l.sublevels[sel] = child
} }
if err := child.loadFile(childCf, m); err != nil { if err := child.loadJSONCheckpointFile(childCf, m); err != nil {
return err return err
} }
} }
@ -357,7 +346,7 @@ func (l *level) loadFile(cf *CheckpointFile, m *MemoryStore) error {
return nil return nil
} }
func (l *level) fromCheckpoint(dir string, from int64, m *MemoryStore) (int, error) { func (l *Level) fromJSONCheckpoint(dir string, from int64, m *MemoryStore) (int, error) {
direntries, err := os.ReadDir(dir) direntries, err := os.ReadDir(dir)
if err != nil { if err != nil {
if os.IsNotExist(err) { if os.IsNotExist(err) {
@ -371,18 +360,18 @@ func (l *level) fromCheckpoint(dir string, from int64, m *MemoryStore) (int, err
filesLoaded := 0 filesLoaded := 0
for _, e := range direntries { for _, e := range direntries {
if e.IsDir() { if e.IsDir() {
child := &level{ child := &Level{
metrics: make([]*buffer, len(m.metrics)), metrics: make([]*chunk, len(m.metrics)),
children: make(map[string]*level), sublevels: make(map[string]*Level),
} }
files, err := child.fromCheckpoint(path.Join(dir, e.Name()), from, m) files, err := child.fromJSONCheckpoint(path.Join(dir, e.Name()), from, m)
filesLoaded += files filesLoaded += files
if err != nil { if err != nil {
return filesLoaded, err return filesLoaded, err
} }
l.children[e.Name()] = child l.sublevels[e.Name()] = child
} else if strings.HasSuffix(e.Name(), ".json") { } else if strings.HasSuffix(e.Name(), ".json") {
jsonFiles = append(jsonFiles, e) jsonFiles = append(jsonFiles, e)
} else { } else {
@ -403,7 +392,7 @@ func (l *level) fromCheckpoint(dir string, from int64, m *MemoryStore) (int, err
defer f.Close() defer f.Close()
br := bufio.NewReader(f) br := bufio.NewReader(f)
cf := &CheckpointFile{} cf := &JSONCheckpointFile{}
if err = json.NewDecoder(br).Decode(cf); err != nil { if err = json.NewDecoder(br).Decode(cf); err != nil {
return filesLoaded, err return filesLoaded, err
} }
@ -412,7 +401,7 @@ func (l *level) fromCheckpoint(dir string, from int64, m *MemoryStore) (int, err
continue continue
} }
if err = l.loadFile(cf, m); err != nil { if err = l.loadJSONCheckpointFile(cf, m); err != nil {
return filesLoaded, err return filesLoaded, err
} }
@ -480,12 +469,13 @@ func ArchiveCheckpoints(checkpointsDir, archiveDir string, from int64, deleteIns
cluster, host string cluster, host string
} }
numWorkers := 2
var wg sync.WaitGroup var wg sync.WaitGroup
n, errs := int32(0), int32(0) n, errs := int32(0), int32(0)
work := make(chan workItem, NumWorkers) work := make(chan workItem, numWorkers)
wg.Add(NumWorkers) wg.Add(numWorkers)
for worker := 0; worker < NumWorkers; worker++ { for worker := 0; worker < numWorkers; worker++ {
go func() { go func() {
defer wg.Done() defer wg.Done()
for workItem := range work { for workItem := range work {