mirror of
https://github.com/ClusterCockpit/cc-metric-store.git
synced 2025-06-18 05:23:49 +02:00
fix and support for avro shutdown writer
This commit is contained in:
parent
06f2f06bdb
commit
8098417f78
@ -91,7 +91,7 @@ func main() {
|
||||
ctx, shutdown := context.WithCancel(context.Background())
|
||||
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(3)
|
||||
wg.Add(4)
|
||||
|
||||
memorystore.Retention(&wg, ctx)
|
||||
memorystore.Checkpointing(&wg, ctx)
|
||||
|
@ -23,7 +23,7 @@ var NumWorkers int = 4
|
||||
|
||||
var ErrNoNewData error = errors.New("no data in the pool")
|
||||
|
||||
func (as *AvroStore) ToCheckpoint(dir string) (int, error) {
|
||||
func (as *AvroStore) ToCheckpoint(dir string, dumpAll bool) (int, error) {
|
||||
levels := make([]*AvroLevel, 0)
|
||||
selectors := make([][]string, 0)
|
||||
as.root.lock.RLock()
|
||||
@ -62,7 +62,7 @@ func (as *AvroStore) ToCheckpoint(dir string) (int, error) {
|
||||
for workItem := range work {
|
||||
var from int64 = getTimestamp(workItem.dir)
|
||||
|
||||
if err := workItem.level.toCheckpoint(workItem.dir, from); err != nil {
|
||||
if err := workItem.level.toCheckpoint(workItem.dir, from, dumpAll); err != nil {
|
||||
if err == ErrNoNewData {
|
||||
continue
|
||||
}
|
||||
@ -145,7 +145,7 @@ func getTimestamp(dir string) int64 {
|
||||
return maxTs
|
||||
}
|
||||
|
||||
func (l *AvroLevel) toCheckpoint(dir string, from int64) error {
|
||||
func (l *AvroLevel) toCheckpoint(dir string, from int64, dumpAll bool) error {
|
||||
l.lock.Lock()
|
||||
defer l.lock.Unlock()
|
||||
|
||||
@ -179,10 +179,6 @@ func (l *AvroLevel) toCheckpoint(dir string, from int64) error {
|
||||
err = os.MkdirAll(path.Dir(dir), 0o755)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create directory: %v", err)
|
||||
// f, err = os.OpenFile(filePath, os.O_CREATE|os.O_RDWR, 0o644)
|
||||
// if err != nil {
|
||||
// return fmt.Errorf("failed to create new avro file: %v", err)
|
||||
// }
|
||||
}
|
||||
} else if fp_, err := os.Stat(filePath); fp_.Size() != 0 || errors.Is(err, os.ErrNotExist) {
|
||||
f, err = os.Open(filePath)
|
||||
@ -197,11 +193,6 @@ func (l *AvroLevel) toCheckpoint(dir string, from int64) error {
|
||||
schema = codec.Schema()
|
||||
|
||||
f.Close()
|
||||
|
||||
// f, err = os.OpenFile(filePath, os.O_APPEND|os.O_RDWR, 0o644)
|
||||
// if err != nil {
|
||||
// return fmt.Errorf("failed to create file: %v", err)
|
||||
// }
|
||||
}
|
||||
f, err := os.OpenFile(filePath, os.O_CREATE|os.O_APPEND|os.O_RDWR, 0o644)
|
||||
if err != nil {
|
||||
@ -211,8 +202,11 @@ func (l *AvroLevel) toCheckpoint(dir string, from int64) error {
|
||||
|
||||
time_ref := time.Now().Add(time.Duration(-CheckpointBufferMinutes+1) * time.Minute).Unix()
|
||||
|
||||
if dumpAll {
|
||||
time_ref = time.Now().Unix()
|
||||
}
|
||||
|
||||
if len(l.data) == 0 {
|
||||
fmt.Printf("no data in the pool\n")
|
||||
// filepath contains the resolution
|
||||
int_res, _ := strconv.Atoi(path.Base(dir))
|
||||
|
||||
|
@ -11,12 +11,13 @@ import (
|
||||
|
||||
func DataStaging(wg *sync.WaitGroup, ctx context.Context) {
|
||||
|
||||
if config.Keys.Checkpoints.FileFormat == "json" {
|
||||
wg.Done()
|
||||
}
|
||||
|
||||
// AvroPool is a pool of Avro writers.
|
||||
go func() {
|
||||
if config.Keys.Checkpoints.FileFormat == "json" {
|
||||
wg.Done() // Mark this goroutine as done
|
||||
return // Exit the goroutine
|
||||
}
|
||||
|
||||
defer wg.Done()
|
||||
|
||||
var avroLevel *AvroLevel
|
||||
|
@ -85,20 +85,12 @@ func Checkpointing(wg *sync.WaitGroup, ctx context.Context) {
|
||||
defer wg.Done()
|
||||
d, _ := time.ParseDuration("1m")
|
||||
|
||||
d_cp, err := time.ParseDuration(config.Keys.Checkpoints.Interval)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
if d_cp <= 0 {
|
||||
return
|
||||
}
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-time.After(time.Duration(avro.CheckpointBufferMinutes) * time.Minute):
|
||||
// This is the first tick untill we collect the data for given minutes.
|
||||
avro.GetAvroStore().ToCheckpoint(config.Keys.Checkpoints.RootDir)
|
||||
avro.GetAvroStore().ToCheckpoint(config.Keys.Checkpoints.RootDir, false)
|
||||
}
|
||||
|
||||
ticks := func() <-chan time.Time {
|
||||
@ -108,22 +100,13 @@ func Checkpointing(wg *sync.WaitGroup, ctx context.Context) {
|
||||
return time.NewTicker(d).C
|
||||
}()
|
||||
|
||||
ticks_cp := func() <-chan time.Time {
|
||||
if d_cp <= 0 {
|
||||
return nil
|
||||
}
|
||||
return time.NewTicker(d_cp).C
|
||||
}()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-ticks_cp:
|
||||
lastCheckpoint = time.Now()
|
||||
case <-ticks:
|
||||
// Regular ticks of 1 minute to write data.
|
||||
avro.GetAvroStore().ToCheckpoint(config.Keys.Checkpoints.RootDir)
|
||||
avro.GetAvroStore().ToCheckpoint(config.Keys.Checkpoints.RootDir, false)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
@ -8,6 +8,7 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/ClusterCockpit/cc-metric-store/internal/avro"
|
||||
"github.com/ClusterCockpit/cc-metric-store/internal/config"
|
||||
"github.com/ClusterCockpit/cc-metric-store/internal/util"
|
||||
"github.com/ClusterCockpit/cc-metric-store/pkg/resampler"
|
||||
@ -76,13 +77,22 @@ func GetMemoryStore() *MemoryStore {
|
||||
}
|
||||
|
||||
func Shutdown() {
|
||||
ms := GetMemoryStore()
|
||||
log.Printf("Writing to '%s'...\n", config.Keys.Checkpoints.RootDir)
|
||||
files, err := ms.ToCheckpoint(config.Keys.Checkpoints.RootDir, lastCheckpoint.Unix(), time.Now().Unix())
|
||||
var files int
|
||||
var err error
|
||||
|
||||
if config.Keys.Checkpoints.FileFormat == "json" {
|
||||
ms := GetMemoryStore()
|
||||
files, err = ms.ToCheckpoint(config.Keys.Checkpoints.RootDir, lastCheckpoint.Unix(), time.Now().Unix())
|
||||
} else {
|
||||
files, err = avro.GetAvroStore().ToCheckpoint(config.Keys.Checkpoints.RootDir, true)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
log.Printf("Writing checkpoint failed: %s\n", err.Error())
|
||||
}
|
||||
log.Printf("Done! (%d files written)\n", files)
|
||||
|
||||
}
|
||||
|
||||
func Retention(wg *sync.WaitGroup, ctx context.Context) {
|
||||
|
Loading…
x
Reference in New Issue
Block a user