diff --git a/cmd/cc-metric-store/main.go b/cmd/cc-metric-store/main.go index e7e8637..fe26ca1 100644 --- a/cmd/cc-metric-store/main.go +++ b/cmd/cc-metric-store/main.go @@ -91,7 +91,7 @@ func main() { ctx, shutdown := context.WithCancel(context.Background()) var wg sync.WaitGroup - wg.Add(3) + wg.Add(4) memorystore.Retention(&wg, ctx) memorystore.Checkpointing(&wg, ctx) diff --git a/internal/avro/avroCheckpoint.go b/internal/avro/avroCheckpoint.go index 1b0bfe6..4217730 100644 --- a/internal/avro/avroCheckpoint.go +++ b/internal/avro/avroCheckpoint.go @@ -23,7 +23,7 @@ var NumWorkers int = 4 var ErrNoNewData error = errors.New("no data in the pool") -func (as *AvroStore) ToCheckpoint(dir string) (int, error) { +func (as *AvroStore) ToCheckpoint(dir string, dumpAll bool) (int, error) { levels := make([]*AvroLevel, 0) selectors := make([][]string, 0) as.root.lock.RLock() @@ -62,7 +62,7 @@ func (as *AvroStore) ToCheckpoint(dir string) (int, error) { for workItem := range work { var from int64 = getTimestamp(workItem.dir) - if err := workItem.level.toCheckpoint(workItem.dir, from); err != nil { + if err := workItem.level.toCheckpoint(workItem.dir, from, dumpAll); err != nil { if err == ErrNoNewData { continue } @@ -145,7 +145,7 @@ func getTimestamp(dir string) int64 { return maxTs } -func (l *AvroLevel) toCheckpoint(dir string, from int64) error { +func (l *AvroLevel) toCheckpoint(dir string, from int64, dumpAll bool) error { l.lock.Lock() defer l.lock.Unlock() @@ -179,10 +179,6 @@ func (l *AvroLevel) toCheckpoint(dir string, from int64) error { err = os.MkdirAll(path.Dir(dir), 0o755) if err != nil { return fmt.Errorf("failed to create directory: %v", err) - // f, err = os.OpenFile(filePath, os.O_CREATE|os.O_RDWR, 0o644) - // if err != nil { - // return fmt.Errorf("failed to create new avro file: %v", err) - // } } } else if fp_, err := os.Stat(filePath); fp_.Size() != 0 || errors.Is(err, os.ErrNotExist) { f, err = os.Open(filePath) @@ -197,11 +193,6 @@ func (l *AvroLevel) toCheckpoint(dir string, from int64) error { schema = codec.Schema() f.Close() - - // f, err = os.OpenFile(filePath, os.O_APPEND|os.O_RDWR, 0o644) - // if err != nil { - // return fmt.Errorf("failed to create file: %v", err) - // } } f, err := os.OpenFile(filePath, os.O_CREATE|os.O_APPEND|os.O_RDWR, 0o644) if err != nil { @@ -211,8 +202,11 @@ func (l *AvroLevel) toCheckpoint(dir string, from int64) error { time_ref := time.Now().Add(time.Duration(-CheckpointBufferMinutes+1) * time.Minute).Unix() + if dumpAll { + time_ref = time.Now().Unix() + } + if len(l.data) == 0 { - fmt.Printf("no data in the pool\n") // filepath contains the resolution int_res, _ := strconv.Atoi(path.Base(dir)) diff --git a/internal/avro/avroHelper.go b/internal/avro/avroHelper.go index b2262bd..e91bb5d 100644 --- a/internal/avro/avroHelper.go +++ b/internal/avro/avroHelper.go @@ -11,12 +11,13 @@ import ( func DataStaging(wg *sync.WaitGroup, ctx context.Context) { - if config.Keys.Checkpoints.FileFormat == "json" { - wg.Done() - } - // AvroPool is a pool of Avro writers. go func() { + if config.Keys.Checkpoints.FileFormat == "json" { + wg.Done() // Mark this goroutine as done + return // Exit the goroutine + } + defer wg.Done() var avroLevel *AvroLevel diff --git a/internal/memorystore/checkpoint.go b/internal/memorystore/checkpoint.go index 2b161e7..a3ebf7e 100644 --- a/internal/memorystore/checkpoint.go +++ b/internal/memorystore/checkpoint.go @@ -85,20 +85,12 @@ func Checkpointing(wg *sync.WaitGroup, ctx context.Context) { defer wg.Done() d, _ := time.ParseDuration("1m") - d_cp, err := time.ParseDuration(config.Keys.Checkpoints.Interval) - if err != nil { - log.Fatal(err) - } - if d_cp <= 0 { - return - } - select { case <-ctx.Done(): return case <-time.After(time.Duration(avro.CheckpointBufferMinutes) * time.Minute): // This is the first tick untill we collect the data for given minutes. - avro.GetAvroStore().ToCheckpoint(config.Keys.Checkpoints.RootDir) + avro.GetAvroStore().ToCheckpoint(config.Keys.Checkpoints.RootDir, false) } ticks := func() <-chan time.Time { @@ -108,22 +100,13 @@ func Checkpointing(wg *sync.WaitGroup, ctx context.Context) { return time.NewTicker(d).C }() - ticks_cp := func() <-chan time.Time { - if d_cp <= 0 { - return nil - } - return time.NewTicker(d_cp).C - }() - for { select { case <-ctx.Done(): return - case <-ticks_cp: - lastCheckpoint = time.Now() case <-ticks: // Regular ticks of 1 minute to write data. - avro.GetAvroStore().ToCheckpoint(config.Keys.Checkpoints.RootDir) + avro.GetAvroStore().ToCheckpoint(config.Keys.Checkpoints.RootDir, false) } } }() diff --git a/internal/memorystore/memorystore.go b/internal/memorystore/memorystore.go index 229ab00..4932efb 100644 --- a/internal/memorystore/memorystore.go +++ b/internal/memorystore/memorystore.go @@ -8,6 +8,7 @@ import ( "sync" "time" + "github.com/ClusterCockpit/cc-metric-store/internal/avro" "github.com/ClusterCockpit/cc-metric-store/internal/config" "github.com/ClusterCockpit/cc-metric-store/internal/util" "github.com/ClusterCockpit/cc-metric-store/pkg/resampler" @@ -76,13 +77,22 @@ func GetMemoryStore() *MemoryStore { } func Shutdown() { - ms := GetMemoryStore() log.Printf("Writing to '%s'...\n", config.Keys.Checkpoints.RootDir) - files, err := ms.ToCheckpoint(config.Keys.Checkpoints.RootDir, lastCheckpoint.Unix(), time.Now().Unix()) + var files int + var err error + + if config.Keys.Checkpoints.FileFormat == "json" { + ms := GetMemoryStore() + files, err = ms.ToCheckpoint(config.Keys.Checkpoints.RootDir, lastCheckpoint.Unix(), time.Now().Unix()) + } else { + files, err = avro.GetAvroStore().ToCheckpoint(config.Keys.Checkpoints.RootDir, true) + } + if err != nil { log.Printf("Writing checkpoint failed: %s\n", err.Error()) } log.Printf("Done! (%d files written)\n", files) + } func Retention(wg *sync.WaitGroup, ctx context.Context) {