Introduce transparent compression for importJob function in all archive backends

This commit is contained in:
2025-12-16 08:55:31 +01:00
parent 6f49998ad3
commit 0306723307
3 changed files with 88 additions and 16 deletions

View File

@@ -603,19 +603,52 @@ func (fsa *FsArchive) ImportJob(
return err
}
f, err = os.Create(path.Join(dir, "data.json"))
if err != nil {
cclog.Error("Error while creating filepath for data.json")
var dataBuf bytes.Buffer
if err := EncodeJobData(&dataBuf, jobData); err != nil {
cclog.Error("Error while encoding job metricdata")
return err
}
if err := EncodeJobData(f, jobData); err != nil {
cclog.Error("Error while encoding job metricdata to data.json file")
return err
if dataBuf.Len() > 2000 {
f, err = os.Create(path.Join(dir, "data.json.gz"))
if err != nil {
cclog.Error("Error while creating filepath for data.json.gz")
return err
}
gzipWriter := gzip.NewWriter(f)
if _, err := gzipWriter.Write(dataBuf.Bytes()); err != nil {
cclog.Error("Error while writing compressed job data")
gzipWriter.Close()
f.Close()
return err
}
if err := gzipWriter.Close(); err != nil {
cclog.Warn("Error while closing gzip writer")
f.Close()
return err
}
if err := f.Close(); err != nil {
cclog.Warn("Error while closing data.json.gz file")
return err
}
} else {
f, err = os.Create(path.Join(dir, "data.json"))
if err != nil {
cclog.Error("Error while creating filepath for data.json")
return err
}
if _, err := f.Write(dataBuf.Bytes()); err != nil {
cclog.Error("Error while writing job metricdata to data.json file")
f.Close()
return err
}
if err := f.Close(); err != nil {
cclog.Warn("Error while closing data.json file")
return err
}
}
if err := f.Close(); err != nil {
cclog.Warn("Error while closing data.json file")
}
return err
return nil
}
func (fsa *FsArchive) StoreClusterCfg(name string, config *schema.Cluster) error {

View File

@@ -467,7 +467,6 @@ func (s3a *S3Archive) StoreJobMeta(job *schema.Job) error {
func (s3a *S3Archive) ImportJob(jobMeta *schema.Job, jobData *schema.JobData) error {
ctx := context.Background()
// Upload meta.json
metaKey := getS3Key(jobMeta, "meta.json")
var metaBuf bytes.Buffer
if err := EncodeJobMeta(&metaBuf, jobMeta); err != nil {
@@ -485,18 +484,37 @@ func (s3a *S3Archive) ImportJob(jobMeta *schema.Job, jobData *schema.JobData) er
return err
}
// Upload data.json
dataKey := getS3Key(jobMeta, "data.json")
var dataBuf bytes.Buffer
if err := EncodeJobData(&dataBuf, jobData); err != nil {
cclog.Error("S3Archive ImportJob() > encoding data error")
return err
}
var dataKey string
var dataBytes []byte
if dataBuf.Len() > 2000 {
dataKey = getS3Key(jobMeta, "data.json.gz")
var compressedBuf bytes.Buffer
gzipWriter := gzip.NewWriter(&compressedBuf)
if _, err := gzipWriter.Write(dataBuf.Bytes()); err != nil {
cclog.Errorf("S3Archive ImportJob() > gzip write error: %v", err)
return err
}
if err := gzipWriter.Close(); err != nil {
cclog.Errorf("S3Archive ImportJob() > gzip close error: %v", err)
return err
}
dataBytes = compressedBuf.Bytes()
} else {
dataKey = getS3Key(jobMeta, "data.json")
dataBytes = dataBuf.Bytes()
}
_, err = s3a.client.PutObject(ctx, &s3.PutObjectInput{
Bucket: aws.String(s3a.bucket),
Key: aws.String(dataKey),
Body: bytes.NewReader(dataBuf.Bytes()),
Body: bytes.NewReader(dataBytes),
})
if err != nil {
cclog.Errorf("S3Archive ImportJob() > PutObject data error: %v", err)

View File

@@ -361,16 +361,37 @@ func (sa *SqliteArchive) ImportJob(jobMeta *schema.Job, jobData *schema.JobData)
return err
}
var dataBytes []byte
var compressed bool
if dataBuf.Len() > 2000 {
var compressedBuf bytes.Buffer
gzipWriter := gzip.NewWriter(&compressedBuf)
if _, err := gzipWriter.Write(dataBuf.Bytes()); err != nil {
cclog.Errorf("SqliteArchive ImportJob() > gzip write error: %v", err)
return err
}
if err := gzipWriter.Close(); err != nil {
cclog.Errorf("SqliteArchive ImportJob() > gzip close error: %v", err)
return err
}
dataBytes = compressedBuf.Bytes()
compressed = true
} else {
dataBytes = dataBuf.Bytes()
compressed = false
}
now := time.Now().Unix()
_, err := sa.db.Exec(`
INSERT INTO jobs (job_id, cluster, start_time, meta_json, data_json, data_compressed, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, 0, ?, ?)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(job_id, cluster, start_time) DO UPDATE SET
meta_json = excluded.meta_json,
data_json = excluded.data_json,
data_compressed = excluded.data_compressed,
updated_at = excluded.updated_at
`, jobMeta.JobID, jobMeta.Cluster, jobMeta.StartTime, metaBuf.Bytes(), dataBuf.Bytes(), now, now)
`, jobMeta.JobID, jobMeta.Cluster, jobMeta.StartTime, metaBuf.Bytes(), dataBytes, compressed, now, now)
if err != nil {
cclog.Errorf("SqliteArchive ImportJob() > insert error: %v", err)
return err