From 7fce6fa401acb9cf443b561ca2e44bb436287d19 Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Tue, 16 Dec 2025 09:11:09 +0100 Subject: [PATCH] Parallelize the Iter function in all archive backends --- pkg/archive/fsBackend.go | 69 +++++++++++++++-------- pkg/archive/s3Backend.go | 56 ++++++++++++------- pkg/archive/sqliteBackend.go | 103 +++++++++++++++++++++-------------- 3 files changed, 144 insertions(+), 84 deletions(-) diff --git a/pkg/archive/fsBackend.go b/pkg/archive/fsBackend.go index 22c1772..b8d2a94 100644 --- a/pkg/archive/fsBackend.go +++ b/pkg/archive/fsBackend.go @@ -18,6 +18,7 @@ import ( "path/filepath" "strconv" "strings" + "sync" "text/tabwriter" "time" @@ -490,7 +491,46 @@ func (fsa *FsArchive) LoadClusterCfg(name string) (*schema.Cluster, error) { func (fsa *FsArchive) Iter(loadMetricData bool) <-chan JobContainer { ch := make(chan JobContainer) + go func() { + defer close(ch) + + numWorkers := 4 + jobPaths := make(chan string, numWorkers*2) + var wg sync.WaitGroup + + for range numWorkers { + wg.Add(1) + go func() { + defer wg.Done() + for jobPath := range jobPaths { + job, err := loadJobMeta(filepath.Join(jobPath, "meta.json")) + if err != nil && !errors.Is(err, &jsonschema.ValidationError{}) { + cclog.Errorf("in %s: %s", jobPath, err.Error()) + continue + } + + if loadMetricData { + isCompressed := true + filename := filepath.Join(jobPath, "data.json.gz") + + if !util.CheckFileExists(filename) { + filename = filepath.Join(jobPath, "data.json") + isCompressed = false + } + + data, err := loadJobData(filename, isCompressed) + if err != nil && !errors.Is(err, &jsonschema.ValidationError{}) { + cclog.Errorf("in %s: %s", jobPath, err.Error()) + } + ch <- JobContainer{Meta: job, Data: &data} + } else { + ch <- JobContainer{Meta: job, Data: nil} + } + } + }() + } + clustersDir, err := os.ReadDir(fsa.path) if err != nil { cclog.Fatalf("Reading clusters failed @ cluster dirs: %s", err.Error()) @@ -507,7 +547,6 @@ func (fsa *FsArchive) Iter(loadMetricData bool) <-chan JobContainer { for _, lvl1Dir := range lvl1Dirs { if !lvl1Dir.IsDir() { - // Could be the cluster.json file continue } @@ -525,35 +564,17 @@ func (fsa *FsArchive) Iter(loadMetricData bool) <-chan JobContainer { for _, startTimeDir := range startTimeDirs { if startTimeDir.IsDir() { - job, err := loadJobMeta(filepath.Join(dirpath, startTimeDir.Name(), "meta.json")) - if err != nil && !errors.Is(err, &jsonschema.ValidationError{}) { - cclog.Errorf("in %s: %s", filepath.Join(dirpath, startTimeDir.Name()), err.Error()) - } - - if loadMetricData { - isCompressed := true - filename := filepath.Join(dirpath, startTimeDir.Name(), "data.json.gz") - - if !util.CheckFileExists(filename) { - filename = filepath.Join(dirpath, startTimeDir.Name(), "data.json") - isCompressed = false - } - - data, err := loadJobData(filename, isCompressed) - if err != nil && !errors.Is(err, &jsonschema.ValidationError{}) { - cclog.Errorf("in %s: %s", filepath.Join(dirpath, startTimeDir.Name()), err.Error()) - } - ch <- JobContainer{Meta: job, Data: &data} - } else { - ch <- JobContainer{Meta: job, Data: nil} - } + jobPaths <- filepath.Join(dirpath, startTimeDir.Name()) } } } } } - close(ch) + + close(jobPaths) + wg.Wait() }() + return ch } diff --git a/pkg/archive/s3Backend.go b/pkg/archive/s3Backend.go index eacdde7..c874a32 100644 --- a/pkg/archive/s3Backend.go +++ b/pkg/archive/s3Backend.go @@ -17,6 +17,7 @@ import ( "os" "strconv" "strings" + "sync" "text/tabwriter" "time" @@ -813,29 +814,18 @@ func (s3a *S3Archive) Iter(loadMetricData bool) <-chan JobContainer { ctx := context.Background() defer close(ch) - for _, cluster := range s3a.clusters { - prefix := cluster + "/" + numWorkers := 4 + metaKeys := make(chan string, numWorkers*2) + var wg sync.WaitGroup - paginator := s3.NewListObjectsV2Paginator(s3a.client, &s3.ListObjectsV2Input{ - Bucket: aws.String(s3a.bucket), - Prefix: aws.String(prefix), - }) - - for paginator.HasMorePages() { - page, err := paginator.NextPage(ctx) - if err != nil { - cclog.Fatalf("S3Archive Iter() > list error: %s", err.Error()) - } - - for _, obj := range page.Contents { - if obj.Key == nil || !strings.HasSuffix(*obj.Key, "/meta.json") { - continue - } - - // Load job metadata + for range numWorkers { + wg.Add(1) + go func() { + defer wg.Done() + for metaKey := range metaKeys { result, err := s3a.client.GetObject(ctx, &s3.GetObjectInput{ Bucket: aws.String(s3a.bucket), - Key: obj.Key, + Key: aws.String(metaKey), }) if err != nil { cclog.Errorf("S3Archive Iter() > GetObject meta error: %v", err) @@ -867,8 +857,34 @@ func (s3a *S3Archive) Iter(loadMetricData bool) <-chan JobContainer { ch <- JobContainer{Meta: job, Data: nil} } } + }() + } + + for _, cluster := range s3a.clusters { + prefix := cluster + "/" + + paginator := s3.NewListObjectsV2Paginator(s3a.client, &s3.ListObjectsV2Input{ + Bucket: aws.String(s3a.bucket), + Prefix: aws.String(prefix), + }) + + for paginator.HasMorePages() { + page, err := paginator.NextPage(ctx) + if err != nil { + cclog.Fatalf("S3Archive Iter() > list error: %s", err.Error()) + } + + for _, obj := range page.Contents { + if obj.Key == nil || !strings.HasSuffix(*obj.Key, "/meta.json") { + continue + } + metaKeys <- *obj.Key + } } } + + close(metaKeys) + wg.Wait() }() return ch diff --git a/pkg/archive/sqliteBackend.go b/pkg/archive/sqliteBackend.go index 589beea..6fa188b 100644 --- a/pkg/archive/sqliteBackend.go +++ b/pkg/archive/sqliteBackend.go @@ -16,6 +16,7 @@ import ( "os" "slices" "strconv" + "sync" "text/tabwriter" "time" @@ -547,62 +548,84 @@ func (sa *SqliteArchive) CompressLast(starttime int64) int64 { return last } +type sqliteJobRow struct { + metaBlob []byte + dataBlob []byte + compressed bool +} + func (sa *SqliteArchive) Iter(loadMetricData bool) <-chan JobContainer { ch := make(chan JobContainer) go func() { defer close(ch) - rows, err := sa.db.Query("SELECT job_id, cluster, start_time, meta_json, data_json, data_compressed FROM jobs ORDER BY cluster, start_time") + rows, err := sa.db.Query("SELECT meta_json, data_json, data_compressed FROM jobs ORDER BY cluster, start_time") if err != nil { cclog.Fatalf("SqliteArchive Iter() > query error: %s", err.Error()) } defer rows.Close() - for rows.Next() { - var jobID int64 - var cluster string - var startTime int64 - var metaBlob []byte - var dataBlob []byte - var compressed bool + numWorkers := 4 + jobRows := make(chan sqliteJobRow, numWorkers*2) + var wg sync.WaitGroup - if err := rows.Scan(&jobID, &cluster, &startTime, &metaBlob, &dataBlob, &compressed); err != nil { + for range numWorkers { + wg.Add(1) + go func() { + defer wg.Done() + for row := range jobRows { + job, err := DecodeJobMeta(bytes.NewReader(row.metaBlob)) + if err != nil { + cclog.Errorf("SqliteArchive Iter() > decode meta error: %v", err) + continue + } + + if loadMetricData && row.dataBlob != nil { + var reader io.Reader = bytes.NewReader(row.dataBlob) + if row.compressed { + gzipReader, err := gzip.NewReader(reader) + if err != nil { + cclog.Errorf("SqliteArchive Iter() > gzip error: %v", err) + ch <- JobContainer{Meta: job, Data: nil} + continue + } + decompressed, err := io.ReadAll(gzipReader) + gzipReader.Close() + if err != nil { + cclog.Errorf("SqliteArchive Iter() > decompress error: %v", err) + ch <- JobContainer{Meta: job, Data: nil} + continue + } + reader = bytes.NewReader(decompressed) + } + + key := fmt.Sprintf("%s:%d:%d", job.Cluster, job.JobID, job.StartTime) + jobData, err := DecodeJobData(reader, key) + if err != nil { + cclog.Errorf("SqliteArchive Iter() > decode data error: %v", err) + ch <- JobContainer{Meta: job, Data: nil} + } else { + ch <- JobContainer{Meta: job, Data: &jobData} + } + } else { + ch <- JobContainer{Meta: job, Data: nil} + } + } + }() + } + + for rows.Next() { + var row sqliteJobRow + if err := rows.Scan(&row.metaBlob, &row.dataBlob, &row.compressed); err != nil { cclog.Errorf("SqliteArchive Iter() > scan error: %v", err) continue } - - job, err := DecodeJobMeta(bytes.NewReader(metaBlob)) - if err != nil { - cclog.Errorf("SqliteArchive Iter() > decode meta error: %v", err) - continue - } - - if loadMetricData && dataBlob != nil { - var reader io.Reader = bytes.NewReader(dataBlob) - if compressed { - gzipReader, err := gzip.NewReader(reader) - if err != nil { - cclog.Errorf("SqliteArchive Iter() > gzip error: %v", err) - ch <- JobContainer{Meta: job, Data: nil} - continue - } - defer gzipReader.Close() - reader = gzipReader - } - - key := fmt.Sprintf("%s:%d:%d", job.Cluster, job.JobID, job.StartTime) - jobData, err := DecodeJobData(reader, key) - if err != nil { - cclog.Errorf("SqliteArchive Iter() > decode data error: %v", err) - ch <- JobContainer{Meta: job, Data: nil} - } else { - ch <- JobContainer{Meta: job, Data: &jobData} - } - } else { - ch <- JobContainer{Meta: job, Data: nil} - } + jobRows <- row } + + close(jobRows) + wg.Wait() }() return ch