From 55cb2cb6d647f26f280f24c8b76b5cd9a6c30a88 Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Tue, 27 Jan 2026 17:10:26 +0100 Subject: [PATCH] Prevent file not closed on error in avro checkpoint --- pkg/metricstore/avroCheckpoint.go | 52 ++++++++++++++++--------------- 1 file changed, 27 insertions(+), 25 deletions(-) diff --git a/pkg/metricstore/avroCheckpoint.go b/pkg/metricstore/avroCheckpoint.go index aa14ce5a..14898186 100644 --- a/pkg/metricstore/avroCheckpoint.go +++ b/pkg/metricstore/avroCheckpoint.go @@ -203,6 +203,7 @@ func (l *AvroLevel) toCheckpoint(dir string, from int64, dumpAll bool) error { if err != nil { return fmt.Errorf("failed to open existing avro file: %v", err) } + defer f.Close() br := bufio.NewReader(f) @@ -212,8 +213,6 @@ func (l *AvroLevel) toCheckpoint(dir string, from int64, dumpAll bool) error { } codec = reader.Codec() schema = codec.Schema() - - f.Close() } timeRef := time.Now().Add(time.Duration(-CheckpointBufferMinutes+1) * time.Minute).Unix() @@ -249,31 +248,35 @@ func (l *AvroLevel) toCheckpoint(dir string, from int64, dumpAll bool) error { return fmt.Errorf("failed to compare read and generated schema: %v", err) } if flag && readFlag && !errors.Is(err_, os.ErrNotExist) { - - f.Close() - - f, err = os.Open(filePath) - if err != nil { - return fmt.Errorf("failed to open Avro file: %v", err) - } - - br := bufio.NewReader(f) - - ocfReader, err := goavro.NewOCFReader(br) - if err != nil { - return fmt.Errorf("failed to create OCF reader while changing schema: %v", err) - } - - for ocfReader.Scan() { - record, err := ocfReader.Read() + // Use closure to ensure file is closed even on error + err := func() error { + f2, err := os.Open(filePath) if err != nil { - return fmt.Errorf("failed to read record: %v", err) + return fmt.Errorf("failed to open Avro file: %v", err) + } + defer f2.Close() + + br := bufio.NewReader(f2) + + ocfReader, err := goavro.NewOCFReader(br) + if err != nil { + return fmt.Errorf("failed to create OCF reader while changing schema: %v", err) } - recordList = append(recordList, record.(map[string]any)) - } + for ocfReader.Scan() { + record, err := ocfReader.Read() + if err != nil { + return fmt.Errorf("failed to read record: %v", err) + } - f.Close() + recordList = append(recordList, record.(map[string]any)) + } + + return nil + }() + if err != nil { + return err + } err = os.Remove(filePath) if err != nil { @@ -300,6 +303,7 @@ func (l *AvroLevel) toCheckpoint(dir string, from int64, dumpAll bool) error { if err != nil { return fmt.Errorf("failed to append new avro file: %v", err) } + defer f.Close() // fmt.Printf("Codec : %#v\n", codec) @@ -317,8 +321,6 @@ func (l *AvroLevel) toCheckpoint(dir string, from int64, dumpAll bool) error { return fmt.Errorf("failed to append record: %v", err) } - f.Close() - return nil }