From 3c1a7e0171f8e4241e26fae691af52ef0b6eef7a Mon Sep 17 00:00:00 2001 From: Aditya Ujeniya Date: Tue, 28 Oct 2025 09:42:28 +0100 Subject: [PATCH] Fixed the behavior of avro write to old files --- internal/memorystore/avroCheckpoint.go | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/internal/memorystore/avroCheckpoint.go b/internal/memorystore/avroCheckpoint.go index 3642186..4d36151 100644 --- a/internal/memorystore/avroCheckpoint.go +++ b/internal/memorystore/avroCheckpoint.go @@ -25,7 +25,7 @@ import ( ) var NumAvroWorkers int = 4 - +var startUp bool = true var ErrNoNewData error = errors.New("no data in the pool") func (as *AvroStore) ToCheckpoint(dir string, dumpAll bool) (int, error) { @@ -96,6 +96,9 @@ func (as *AvroStore) ToCheckpoint(dir string, dumpAll bool) (int, error) { if errs > 0 { return int(n), fmt.Errorf("%d errors happend while creating avro checkpoints (%d successes)", errs, n) } + + startUp = false + return int(n), nil } @@ -143,6 +146,10 @@ func getTimestamp(dir string) int64 { interval, _ := time.ParseDuration(Keys.Checkpoints.Interval) updateTime := time.Unix(maxTS, 0).Add(interval).Add(time.Duration(CheckpointBufferMinutes-1) * time.Minute).Unix() + if startUp { + return 0 + } + if updateTime < time.Now().Unix() { return 0 }