diff --git a/pkg/archive/fsBackend.go b/pkg/archive/fsBackend.go index 1e9d7db..22c1772 100644 --- a/pkg/archive/fsBackend.go +++ b/pkg/archive/fsBackend.go @@ -603,19 +603,52 @@ func (fsa *FsArchive) ImportJob( return err } - f, err = os.Create(path.Join(dir, "data.json")) - if err != nil { - cclog.Error("Error while creating filepath for data.json") + var dataBuf bytes.Buffer + if err := EncodeJobData(&dataBuf, jobData); err != nil { + cclog.Error("Error while encoding job metricdata") return err } - if err := EncodeJobData(f, jobData); err != nil { - cclog.Error("Error while encoding job metricdata to data.json file") - return err + + if dataBuf.Len() > 2000 { + f, err = os.Create(path.Join(dir, "data.json.gz")) + if err != nil { + cclog.Error("Error while creating filepath for data.json.gz") + return err + } + gzipWriter := gzip.NewWriter(f) + if _, err := gzipWriter.Write(dataBuf.Bytes()); err != nil { + cclog.Error("Error while writing compressed job data") + gzipWriter.Close() + f.Close() + return err + } + if err := gzipWriter.Close(); err != nil { + cclog.Warn("Error while closing gzip writer") + f.Close() + return err + } + if err := f.Close(); err != nil { + cclog.Warn("Error while closing data.json.gz file") + return err + } + } else { + f, err = os.Create(path.Join(dir, "data.json")) + if err != nil { + cclog.Error("Error while creating filepath for data.json") + return err + } + if _, err := f.Write(dataBuf.Bytes()); err != nil { + cclog.Error("Error while writing job metricdata to data.json file") + f.Close() + return err + } + if err := f.Close(); err != nil { + cclog.Warn("Error while closing data.json file") + return err + } } - if err := f.Close(); err != nil { - cclog.Warn("Error while closing data.json file") - } - return err + + return nil } func (fsa *FsArchive) StoreClusterCfg(name string, config *schema.Cluster) error { diff --git a/pkg/archive/s3Backend.go b/pkg/archive/s3Backend.go index 5b3d9f0..eacdde7 100644 --- a/pkg/archive/s3Backend.go +++ b/pkg/archive/s3Backend.go @@ -467,7 +467,6 @@ func (s3a *S3Archive) StoreJobMeta(job *schema.Job) error { func (s3a *S3Archive) ImportJob(jobMeta *schema.Job, jobData *schema.JobData) error { ctx := context.Background() - // Upload meta.json metaKey := getS3Key(jobMeta, "meta.json") var metaBuf bytes.Buffer if err := EncodeJobMeta(&metaBuf, jobMeta); err != nil { @@ -485,18 +484,37 @@ func (s3a *S3Archive) ImportJob(jobMeta *schema.Job, jobData *schema.JobData) er return err } - // Upload data.json - dataKey := getS3Key(jobMeta, "data.json") var dataBuf bytes.Buffer if err := EncodeJobData(&dataBuf, jobData); err != nil { cclog.Error("S3Archive ImportJob() > encoding data error") return err } + var dataKey string + var dataBytes []byte + + if dataBuf.Len() > 2000 { + dataKey = getS3Key(jobMeta, "data.json.gz") + var compressedBuf bytes.Buffer + gzipWriter := gzip.NewWriter(&compressedBuf) + if _, err := gzipWriter.Write(dataBuf.Bytes()); err != nil { + cclog.Errorf("S3Archive ImportJob() > gzip write error: %v", err) + return err + } + if err := gzipWriter.Close(); err != nil { + cclog.Errorf("S3Archive ImportJob() > gzip close error: %v", err) + return err + } + dataBytes = compressedBuf.Bytes() + } else { + dataKey = getS3Key(jobMeta, "data.json") + dataBytes = dataBuf.Bytes() + } + _, err = s3a.client.PutObject(ctx, &s3.PutObjectInput{ Bucket: aws.String(s3a.bucket), Key: aws.String(dataKey), - Body: bytes.NewReader(dataBuf.Bytes()), + Body: bytes.NewReader(dataBytes), }) if err != nil { cclog.Errorf("S3Archive ImportJob() > PutObject data error: %v", err) diff --git a/pkg/archive/sqliteBackend.go b/pkg/archive/sqliteBackend.go index 49aeb79..589beea 100644 --- a/pkg/archive/sqliteBackend.go +++ b/pkg/archive/sqliteBackend.go @@ -361,16 +361,37 @@ func (sa *SqliteArchive) ImportJob(jobMeta *schema.Job, jobData *schema.JobData) return err } + var dataBytes []byte + var compressed bool + + if dataBuf.Len() > 2000 { + var compressedBuf bytes.Buffer + gzipWriter := gzip.NewWriter(&compressedBuf) + if _, err := gzipWriter.Write(dataBuf.Bytes()); err != nil { + cclog.Errorf("SqliteArchive ImportJob() > gzip write error: %v", err) + return err + } + if err := gzipWriter.Close(); err != nil { + cclog.Errorf("SqliteArchive ImportJob() > gzip close error: %v", err) + return err + } + dataBytes = compressedBuf.Bytes() + compressed = true + } else { + dataBytes = dataBuf.Bytes() + compressed = false + } + now := time.Now().Unix() _, err := sa.db.Exec(` INSERT INTO jobs (job_id, cluster, start_time, meta_json, data_json, data_compressed, created_at, updated_at) - VALUES (?, ?, ?, ?, ?, 0, ?, ?) + VALUES (?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT(job_id, cluster, start_time) DO UPDATE SET meta_json = excluded.meta_json, data_json = excluded.data_json, data_compressed = excluded.data_compressed, updated_at = excluded.updated_at - `, jobMeta.JobID, jobMeta.Cluster, jobMeta.StartTime, metaBuf.Bytes(), dataBuf.Bytes(), now, now) + `, jobMeta.JobID, jobMeta.Cluster, jobMeta.StartTime, metaBuf.Bytes(), dataBytes, compressed, now, now) if err != nil { cclog.Errorf("SqliteArchive ImportJob() > insert error: %v", err) return err