fix file descriptor leak

This commit is contained in:
Lou Knauer 2022-02-21 12:29:46 +01:00
parent 29fe272ea9
commit 182a1fa67d

View File

@ -12,7 +12,6 @@ import (
"os" "os"
"path" "path"
"path/filepath" "path/filepath"
"runtime"
"sort" "sort"
"strconv" "strconv"
"strings" "strings"
@ -57,7 +56,7 @@ type CheckpointFile struct {
var ErrNoNewData error = errors.New("all data already archived") var ErrNoNewData error = errors.New("all data already archived")
var NumWorkers int = (runtime.NumCPU() / 4) + 1 var NumWorkers int = 6
// Metrics stored at the lowest 2 levels are not stored away (root and cluster)! // Metrics stored at the lowest 2 levels are not stored away (root and cluster)!
// On a per-host basis a new JSON file is created. I have no idea if this will scale. // On a per-host basis a new JSON file is created. I have no idea if this will scale.
@ -213,7 +212,6 @@ func (l *level) toCheckpoint(dir string, from, to int64, m *MemoryStore) error {
defer f.Close() defer f.Close()
bw := bufio.NewWriter(f) bw := bufio.NewWriter(f)
if err = json.NewEncoder(bw).Encode(cf); err != nil { if err = json.NewEncoder(bw).Encode(cf); err != nil {
return err return err
} }
@ -226,7 +224,7 @@ func (l *level) toCheckpoint(dir string, from, to int64, m *MemoryStore) error {
// Different host's data is loaded to memory in parallel. // Different host's data is loaded to memory in parallel.
func (m *MemoryStore) FromCheckpoint(dir string, from int64) (int, error) { func (m *MemoryStore) FromCheckpoint(dir string, from int64) (int, error) {
var wg sync.WaitGroup var wg sync.WaitGroup
work := make(chan [2]string, NumWorkers*2) work := make(chan [2]string, NumWorkers)
n, errs := int32(0), int32(0) n, errs := int32(0), int32(0)
wg.Add(NumWorkers) wg.Add(NumWorkers)
@ -379,6 +377,7 @@ func (l *level) fromCheckpoint(dir string, from int64, m *MemoryStore) (int, err
if err != nil { if err != nil {
return filesLoaded, err return filesLoaded, err
} }
defer f.Close()
br := bufio.NewReader(f) br := bufio.NewReader(f)
cf := &CheckpointFile{} cf := &CheckpointFile{}
@ -394,10 +393,6 @@ func (l *level) fromCheckpoint(dir string, from int64, m *MemoryStore) (int, err
return filesLoaded, err return filesLoaded, err
} }
if err = f.Close(); err != nil {
return filesLoaded, err
}
filesLoaded += 1 filesLoaded += 1
} }
@ -464,7 +459,7 @@ func ArchiveCheckpoints(checkpointsDir, archiveDir string, from int64) (int, err
var wg sync.WaitGroup var wg sync.WaitGroup
n, errs := int32(0), int32(0) n, errs := int32(0), int32(0)
work := make(chan workItem, NumWorkers*2) work := make(chan workItem, NumWorkers)
wg.Add(NumWorkers) wg.Add(NumWorkers)
for worker := 0; worker < NumWorkers; worker++ { for worker := 0; worker < NumWorkers; worker++ {
@ -535,7 +530,9 @@ func archiveCheckpoints(dir string, archiveDir string, from int64) (int, error)
} }
defer f.Close() defer f.Close()
bw := bufio.NewWriter(f) bw := bufio.NewWriter(f)
defer bw.Flush()
zw := zip.NewWriter(bw) zw := zip.NewWriter(bw)
defer zw.Close()
n := 0 n := 0
for _, jsonFile := range files { for _, jsonFile := range files {
@ -544,6 +541,7 @@ func archiveCheckpoints(dir string, archiveDir string, from int64) (int, error)
if err != nil { if err != nil {
return n, err return n, err
} }
defer r.Close()
w, err := zw.Create(jsonFile) w, err := zw.Create(jsonFile)
if err != nil { if err != nil {
@ -560,13 +558,5 @@ func archiveCheckpoints(dir string, archiveDir string, from int64) (int, error)
n += 1 n += 1
} }
if err = zw.Close(); err != nil {
return n, err
}
if err = bw.Flush(); err != nil {
return n, err
}
return n, nil return n, nil
} }