From 182a1fa67dbb8a09f7462815187ba4fb56def6b9 Mon Sep 17 00:00:00 2001 From: Lou Knauer Date: Mon, 21 Feb 2022 12:29:46 +0100 Subject: [PATCH] fix file descriptor leak --- archive.go | 24 +++++++----------------- 1 file changed, 7 insertions(+), 17 deletions(-) diff --git a/archive.go b/archive.go index aef367a..be47696 100644 --- a/archive.go +++ b/archive.go @@ -12,7 +12,6 @@ import ( "os" "path" "path/filepath" - "runtime" "sort" "strconv" "strings" @@ -57,7 +56,7 @@ type CheckpointFile struct { var ErrNoNewData error = errors.New("all data already archived") -var NumWorkers int = (runtime.NumCPU() / 4) + 1 +var NumWorkers int = 6 // Metrics stored at the lowest 2 levels are not stored away (root and cluster)! // On a per-host basis a new JSON file is created. I have no idea if this will scale. @@ -213,7 +212,6 @@ func (l *level) toCheckpoint(dir string, from, to int64, m *MemoryStore) error { defer f.Close() bw := bufio.NewWriter(f) - if err = json.NewEncoder(bw).Encode(cf); err != nil { return err } @@ -226,7 +224,7 @@ func (l *level) toCheckpoint(dir string, from, to int64, m *MemoryStore) error { // Different host's data is loaded to memory in parallel. func (m *MemoryStore) FromCheckpoint(dir string, from int64) (int, error) { var wg sync.WaitGroup - work := make(chan [2]string, NumWorkers*2) + work := make(chan [2]string, NumWorkers) n, errs := int32(0), int32(0) wg.Add(NumWorkers) @@ -379,6 +377,7 @@ func (l *level) fromCheckpoint(dir string, from int64, m *MemoryStore) (int, err if err != nil { return filesLoaded, err } + defer f.Close() br := bufio.NewReader(f) cf := &CheckpointFile{} @@ -394,10 +393,6 @@ func (l *level) fromCheckpoint(dir string, from int64, m *MemoryStore) (int, err return filesLoaded, err } - if err = f.Close(); err != nil { - return filesLoaded, err - } - filesLoaded += 1 } @@ -464,7 +459,7 @@ func ArchiveCheckpoints(checkpointsDir, archiveDir string, from int64) (int, err var wg sync.WaitGroup n, errs := int32(0), int32(0) - work := make(chan workItem, NumWorkers*2) + work := make(chan workItem, NumWorkers) wg.Add(NumWorkers) for worker := 0; worker < NumWorkers; worker++ { @@ -535,7 +530,9 @@ func archiveCheckpoints(dir string, archiveDir string, from int64) (int, error) } defer f.Close() bw := bufio.NewWriter(f) + defer bw.Flush() zw := zip.NewWriter(bw) + defer zw.Close() n := 0 for _, jsonFile := range files { @@ -544,6 +541,7 @@ func archiveCheckpoints(dir string, archiveDir string, from int64) (int, error) if err != nil { return n, err } + defer r.Close() w, err := zw.Create(jsonFile) if err != nil { @@ -560,13 +558,5 @@ func archiveCheckpoints(dir string, archiveDir string, from int64) (int, error) n += 1 } - if err = zw.Close(); err != nil { - return n, err - } - - if err = bw.Flush(); err != nil { - return n, err - } - return n, nil }