mirror of
https://github.com/ClusterCockpit/cc-backend
synced 2026-01-28 23:11:46 +01:00
Prevent file not closed on error in avro checkpoint
This commit is contained in:
@@ -203,6 +203,7 @@ func (l *AvroLevel) toCheckpoint(dir string, from int64, dumpAll bool) error {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to open existing avro file: %v", err)
|
return fmt.Errorf("failed to open existing avro file: %v", err)
|
||||||
}
|
}
|
||||||
|
defer f.Close()
|
||||||
|
|
||||||
br := bufio.NewReader(f)
|
br := bufio.NewReader(f)
|
||||||
|
|
||||||
@@ -212,8 +213,6 @@ func (l *AvroLevel) toCheckpoint(dir string, from int64, dumpAll bool) error {
|
|||||||
}
|
}
|
||||||
codec = reader.Codec()
|
codec = reader.Codec()
|
||||||
schema = codec.Schema()
|
schema = codec.Schema()
|
||||||
|
|
||||||
f.Close()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
timeRef := time.Now().Add(time.Duration(-CheckpointBufferMinutes+1) * time.Minute).Unix()
|
timeRef := time.Now().Add(time.Duration(-CheckpointBufferMinutes+1) * time.Minute).Unix()
|
||||||
@@ -249,31 +248,35 @@ func (l *AvroLevel) toCheckpoint(dir string, from int64, dumpAll bool) error {
|
|||||||
return fmt.Errorf("failed to compare read and generated schema: %v", err)
|
return fmt.Errorf("failed to compare read and generated schema: %v", err)
|
||||||
}
|
}
|
||||||
if flag && readFlag && !errors.Is(err_, os.ErrNotExist) {
|
if flag && readFlag && !errors.Is(err_, os.ErrNotExist) {
|
||||||
|
// Use closure to ensure file is closed even on error
|
||||||
f.Close()
|
err := func() error {
|
||||||
|
f2, err := os.Open(filePath)
|
||||||
f, err = os.Open(filePath)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("failed to open Avro file: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
br := bufio.NewReader(f)
|
|
||||||
|
|
||||||
ocfReader, err := goavro.NewOCFReader(br)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("failed to create OCF reader while changing schema: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
for ocfReader.Scan() {
|
|
||||||
record, err := ocfReader.Read()
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to read record: %v", err)
|
return fmt.Errorf("failed to open Avro file: %v", err)
|
||||||
|
}
|
||||||
|
defer f2.Close()
|
||||||
|
|
||||||
|
br := bufio.NewReader(f2)
|
||||||
|
|
||||||
|
ocfReader, err := goavro.NewOCFReader(br)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to create OCF reader while changing schema: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
recordList = append(recordList, record.(map[string]any))
|
for ocfReader.Scan() {
|
||||||
}
|
record, err := ocfReader.Read()
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to read record: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
f.Close()
|
recordList = append(recordList, record.(map[string]any))
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
err = os.Remove(filePath)
|
err = os.Remove(filePath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -300,6 +303,7 @@ func (l *AvroLevel) toCheckpoint(dir string, from int64, dumpAll bool) error {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to append new avro file: %v", err)
|
return fmt.Errorf("failed to append new avro file: %v", err)
|
||||||
}
|
}
|
||||||
|
defer f.Close()
|
||||||
|
|
||||||
// fmt.Printf("Codec : %#v\n", codec)
|
// fmt.Printf("Codec : %#v\n", codec)
|
||||||
|
|
||||||
@@ -317,8 +321,6 @@ func (l *AvroLevel) toCheckpoint(dir string, from int64, dumpAll bool) error {
|
|||||||
return fmt.Errorf("failed to append record: %v", err)
|
return fmt.Errorf("failed to append record: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
f.Close()
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user