Parallelize the Iter function in all archive backends

This commit is contained in:
2025-12-16 09:11:09 +01:00
parent e6286768a7
commit 7fce6fa401
3 changed files with 144 additions and 84 deletions

View File

@@ -18,6 +18,7 @@ import (
"path/filepath" "path/filepath"
"strconv" "strconv"
"strings" "strings"
"sync"
"text/tabwriter" "text/tabwriter"
"time" "time"
@@ -490,7 +491,46 @@ func (fsa *FsArchive) LoadClusterCfg(name string) (*schema.Cluster, error) {
func (fsa *FsArchive) Iter(loadMetricData bool) <-chan JobContainer { func (fsa *FsArchive) Iter(loadMetricData bool) <-chan JobContainer {
ch := make(chan JobContainer) ch := make(chan JobContainer)
go func() { go func() {
defer close(ch)
numWorkers := 4
jobPaths := make(chan string, numWorkers*2)
var wg sync.WaitGroup
for range numWorkers {
wg.Add(1)
go func() {
defer wg.Done()
for jobPath := range jobPaths {
job, err := loadJobMeta(filepath.Join(jobPath, "meta.json"))
if err != nil && !errors.Is(err, &jsonschema.ValidationError{}) {
cclog.Errorf("in %s: %s", jobPath, err.Error())
continue
}
if loadMetricData {
isCompressed := true
filename := filepath.Join(jobPath, "data.json.gz")
if !util.CheckFileExists(filename) {
filename = filepath.Join(jobPath, "data.json")
isCompressed = false
}
data, err := loadJobData(filename, isCompressed)
if err != nil && !errors.Is(err, &jsonschema.ValidationError{}) {
cclog.Errorf("in %s: %s", jobPath, err.Error())
}
ch <- JobContainer{Meta: job, Data: &data}
} else {
ch <- JobContainer{Meta: job, Data: nil}
}
}
}()
}
clustersDir, err := os.ReadDir(fsa.path) clustersDir, err := os.ReadDir(fsa.path)
if err != nil { if err != nil {
cclog.Fatalf("Reading clusters failed @ cluster dirs: %s", err.Error()) cclog.Fatalf("Reading clusters failed @ cluster dirs: %s", err.Error())
@@ -507,7 +547,6 @@ func (fsa *FsArchive) Iter(loadMetricData bool) <-chan JobContainer {
for _, lvl1Dir := range lvl1Dirs { for _, lvl1Dir := range lvl1Dirs {
if !lvl1Dir.IsDir() { if !lvl1Dir.IsDir() {
// Could be the cluster.json file
continue continue
} }
@@ -525,35 +564,17 @@ func (fsa *FsArchive) Iter(loadMetricData bool) <-chan JobContainer {
for _, startTimeDir := range startTimeDirs { for _, startTimeDir := range startTimeDirs {
if startTimeDir.IsDir() { if startTimeDir.IsDir() {
job, err := loadJobMeta(filepath.Join(dirpath, startTimeDir.Name(), "meta.json")) jobPaths <- filepath.Join(dirpath, startTimeDir.Name())
if err != nil && !errors.Is(err, &jsonschema.ValidationError{}) { }
cclog.Errorf("in %s: %s", filepath.Join(dirpath, startTimeDir.Name()), err.Error()) }
}
}
} }
if loadMetricData { close(jobPaths)
isCompressed := true wg.Wait()
filename := filepath.Join(dirpath, startTimeDir.Name(), "data.json.gz")
if !util.CheckFileExists(filename) {
filename = filepath.Join(dirpath, startTimeDir.Name(), "data.json")
isCompressed = false
}
data, err := loadJobData(filename, isCompressed)
if err != nil && !errors.Is(err, &jsonschema.ValidationError{}) {
cclog.Errorf("in %s: %s", filepath.Join(dirpath, startTimeDir.Name()), err.Error())
}
ch <- JobContainer{Meta: job, Data: &data}
} else {
ch <- JobContainer{Meta: job, Data: nil}
}
}
}
}
}
}
close(ch)
}() }()
return ch return ch
} }

View File

@@ -17,6 +17,7 @@ import (
"os" "os"
"strconv" "strconv"
"strings" "strings"
"sync"
"text/tabwriter" "text/tabwriter"
"time" "time"
@@ -813,29 +814,18 @@ func (s3a *S3Archive) Iter(loadMetricData bool) <-chan JobContainer {
ctx := context.Background() ctx := context.Background()
defer close(ch) defer close(ch)
for _, cluster := range s3a.clusters { numWorkers := 4
prefix := cluster + "/" metaKeys := make(chan string, numWorkers*2)
var wg sync.WaitGroup
paginator := s3.NewListObjectsV2Paginator(s3a.client, &s3.ListObjectsV2Input{ for range numWorkers {
Bucket: aws.String(s3a.bucket), wg.Add(1)
Prefix: aws.String(prefix), go func() {
}) defer wg.Done()
for metaKey := range metaKeys {
for paginator.HasMorePages() {
page, err := paginator.NextPage(ctx)
if err != nil {
cclog.Fatalf("S3Archive Iter() > list error: %s", err.Error())
}
for _, obj := range page.Contents {
if obj.Key == nil || !strings.HasSuffix(*obj.Key, "/meta.json") {
continue
}
// Load job metadata
result, err := s3a.client.GetObject(ctx, &s3.GetObjectInput{ result, err := s3a.client.GetObject(ctx, &s3.GetObjectInput{
Bucket: aws.String(s3a.bucket), Bucket: aws.String(s3a.bucket),
Key: obj.Key, Key: aws.String(metaKey),
}) })
if err != nil { if err != nil {
cclog.Errorf("S3Archive Iter() > GetObject meta error: %v", err) cclog.Errorf("S3Archive Iter() > GetObject meta error: %v", err)
@@ -867,8 +857,34 @@ func (s3a *S3Archive) Iter(loadMetricData bool) <-chan JobContainer {
ch <- JobContainer{Meta: job, Data: nil} ch <- JobContainer{Meta: job, Data: nil}
} }
} }
}()
}
for _, cluster := range s3a.clusters {
prefix := cluster + "/"
paginator := s3.NewListObjectsV2Paginator(s3a.client, &s3.ListObjectsV2Input{
Bucket: aws.String(s3a.bucket),
Prefix: aws.String(prefix),
})
for paginator.HasMorePages() {
page, err := paginator.NextPage(ctx)
if err != nil {
cclog.Fatalf("S3Archive Iter() > list error: %s", err.Error())
}
for _, obj := range page.Contents {
if obj.Key == nil || !strings.HasSuffix(*obj.Key, "/meta.json") {
continue
}
metaKeys <- *obj.Key
} }
} }
}
close(metaKeys)
wg.Wait()
}() }()
return ch return ch

View File

@@ -16,6 +16,7 @@ import (
"os" "os"
"slices" "slices"
"strconv" "strconv"
"sync"
"text/tabwriter" "text/tabwriter"
"time" "time"
@@ -547,48 +548,56 @@ func (sa *SqliteArchive) CompressLast(starttime int64) int64 {
return last return last
} }
type sqliteJobRow struct {
metaBlob []byte
dataBlob []byte
compressed bool
}
func (sa *SqliteArchive) Iter(loadMetricData bool) <-chan JobContainer { func (sa *SqliteArchive) Iter(loadMetricData bool) <-chan JobContainer {
ch := make(chan JobContainer) ch := make(chan JobContainer)
go func() { go func() {
defer close(ch) defer close(ch)
rows, err := sa.db.Query("SELECT job_id, cluster, start_time, meta_json, data_json, data_compressed FROM jobs ORDER BY cluster, start_time") rows, err := sa.db.Query("SELECT meta_json, data_json, data_compressed FROM jobs ORDER BY cluster, start_time")
if err != nil { if err != nil {
cclog.Fatalf("SqliteArchive Iter() > query error: %s", err.Error()) cclog.Fatalf("SqliteArchive Iter() > query error: %s", err.Error())
} }
defer rows.Close() defer rows.Close()
for rows.Next() { numWorkers := 4
var jobID int64 jobRows := make(chan sqliteJobRow, numWorkers*2)
var cluster string var wg sync.WaitGroup
var startTime int64
var metaBlob []byte
var dataBlob []byte
var compressed bool
if err := rows.Scan(&jobID, &cluster, &startTime, &metaBlob, &dataBlob, &compressed); err != nil { for range numWorkers {
cclog.Errorf("SqliteArchive Iter() > scan error: %v", err) wg.Add(1)
continue go func() {
} defer wg.Done()
for row := range jobRows {
job, err := DecodeJobMeta(bytes.NewReader(metaBlob)) job, err := DecodeJobMeta(bytes.NewReader(row.metaBlob))
if err != nil { if err != nil {
cclog.Errorf("SqliteArchive Iter() > decode meta error: %v", err) cclog.Errorf("SqliteArchive Iter() > decode meta error: %v", err)
continue continue
} }
if loadMetricData && dataBlob != nil { if loadMetricData && row.dataBlob != nil {
var reader io.Reader = bytes.NewReader(dataBlob) var reader io.Reader = bytes.NewReader(row.dataBlob)
if compressed { if row.compressed {
gzipReader, err := gzip.NewReader(reader) gzipReader, err := gzip.NewReader(reader)
if err != nil { if err != nil {
cclog.Errorf("SqliteArchive Iter() > gzip error: %v", err) cclog.Errorf("SqliteArchive Iter() > gzip error: %v", err)
ch <- JobContainer{Meta: job, Data: nil} ch <- JobContainer{Meta: job, Data: nil}
continue continue
} }
defer gzipReader.Close() decompressed, err := io.ReadAll(gzipReader)
reader = gzipReader gzipReader.Close()
if err != nil {
cclog.Errorf("SqliteArchive Iter() > decompress error: %v", err)
ch <- JobContainer{Meta: job, Data: nil}
continue
}
reader = bytes.NewReader(decompressed)
} }
key := fmt.Sprintf("%s:%d:%d", job.Cluster, job.JobID, job.StartTime) key := fmt.Sprintf("%s:%d:%d", job.Cluster, job.JobID, job.StartTime)
@@ -604,6 +613,20 @@ func (sa *SqliteArchive) Iter(loadMetricData bool) <-chan JobContainer {
} }
} }
}() }()
}
for rows.Next() {
var row sqliteJobRow
if err := rows.Scan(&row.metaBlob, &row.dataBlob, &row.compressed); err != nil {
cclog.Errorf("SqliteArchive Iter() > scan error: %v", err)
continue
}
jobRows <- row
}
close(jobRows)
wg.Wait()
}()
return ch return ch
} }