From 6cfed989ff0897150da2d968871a2cd5669536c0 Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Mon, 15 Dec 2025 11:23:53 +0100 Subject: [PATCH] Fix bugs in sqlite backend --- pkg/archive/sqliteBackend.go | 55 ++++++++++++++++++++++++++++-------- 1 file changed, 44 insertions(+), 11 deletions(-) diff --git a/pkg/archive/sqliteBackend.go b/pkg/archive/sqliteBackend.go index edff923..49aeb79 100644 --- a/pkg/archive/sqliteBackend.go +++ b/pkg/archive/sqliteBackend.go @@ -14,6 +14,7 @@ import ( "io" "math" "os" + "slices" "strconv" "text/tabwriter" "time" @@ -251,6 +252,7 @@ func (sa *SqliteArchive) LoadJobData(job *schema.Job) (schema.JobData, error) { cclog.Errorf("SqliteArchive LoadJobData() > query error: %v", err) return nil, err } + key := fmt.Sprintf("%s:%d:%d", job.Cluster, job.JobID, job.StartTime) var reader io.Reader = bytes.NewReader(dataBlob) if compressed { @@ -268,10 +270,10 @@ func (sa *SqliteArchive) LoadJobData(job *schema.Job) (schema.JobData, error) { if err := schema.Validate(schema.Data, bytes.NewReader(data)); err != nil { return schema.JobData{}, fmt.Errorf("validate job data: %v", err) } - return DecodeJobData(bytes.NewReader(data), "sqlite") + return DecodeJobData(bytes.NewReader(data), key) } - return DecodeJobData(reader, "sqlite") + return DecodeJobData(reader, key) } func (sa *SqliteArchive) LoadJobStats(job *schema.Job) (schema.ScopedJobStats, error) { @@ -283,6 +285,7 @@ func (sa *SqliteArchive) LoadJobStats(job *schema.Job) (schema.ScopedJobStats, e cclog.Errorf("SqliteArchive LoadJobStats() > query error: %v", err) return nil, err } + key := fmt.Sprintf("%s:%d:%d", job.Cluster, job.JobID, job.StartTime) var reader io.Reader = bytes.NewReader(dataBlob) if compressed { @@ -300,10 +303,10 @@ func (sa *SqliteArchive) LoadJobStats(job *schema.Job) (schema.ScopedJobStats, e if err := schema.Validate(schema.Data, bytes.NewReader(data)); err != nil { return nil, fmt.Errorf("validate job data: %v", err) } - return DecodeJobStats(bytes.NewReader(data), "sqlite") + return DecodeJobStats(bytes.NewReader(data), key) } - return DecodeJobStats(reader, "sqlite") + return DecodeJobStats(reader, key) } func (sa *SqliteArchive) LoadClusterCfg(name string) (*schema.Cluster, error) { @@ -314,9 +317,11 @@ func (sa *SqliteArchive) LoadClusterCfg(name string) (*schema.Cluster, error) { return nil, err } - if err := schema.Validate(schema.ClusterCfg, bytes.NewReader(configBlob)); err != nil { - cclog.Warnf("Validate cluster config: %v\n", err) - return &schema.Cluster{}, fmt.Errorf("validate cluster config: %v", err) + if config.Keys.Validate { + if err := schema.Validate(schema.ClusterCfg, bytes.NewReader(configBlob)); err != nil { + cclog.Warnf("Validate cluster config: %v\n", err) + return &schema.Cluster{}, fmt.Errorf("validate cluster config: %v", err) + } } return DecodeCluster(bytes.NewReader(configBlob)) @@ -337,7 +342,6 @@ func (sa *SqliteArchive) StoreJobMeta(job *schema.Job) error { meta_json = excluded.meta_json, updated_at = excluded.updated_at `, job.JobID, job.Cluster, job.StartTime, metaBuf.Bytes(), now, now) - if err != nil { cclog.Errorf("SqliteArchive StoreJobMeta() > insert error: %v", err) return err @@ -367,7 +371,6 @@ func (sa *SqliteArchive) ImportJob(jobMeta *schema.Job, jobData *schema.JobData) data_compressed = excluded.data_compressed, updated_at = excluded.updated_at `, jobMeta.JobID, jobMeta.Cluster, jobMeta.StartTime, metaBuf.Bytes(), dataBuf.Bytes(), now, now) - if err != nil { cclog.Errorf("SqliteArchive ImportJob() > insert error: %v", err) return err @@ -494,7 +497,7 @@ func (sa *SqliteArchive) Compress(jobs []*schema.Job) { func (sa *SqliteArchive) CompressLast(starttime int64) int64 { var lastStr string err := sa.db.QueryRow("SELECT value FROM metadata WHERE key = 'compress_last'").Scan(&lastStr) - + var last int64 if err == sql.ErrNoRows { last = starttime @@ -567,7 +570,8 @@ func (sa *SqliteArchive) Iter(loadMetricData bool) <-chan JobContainer { reader = gzipReader } - jobData, err := DecodeJobData(reader, "sqlite") + key := fmt.Sprintf("%s:%d:%d", job.Cluster, job.JobID, job.StartTime) + jobData, err := DecodeJobData(reader, key) if err != nil { cclog.Errorf("SqliteArchive Iter() > decode data error: %v", err) ch <- JobContainer{Meta: job, Data: nil} @@ -582,3 +586,32 @@ func (sa *SqliteArchive) Iter(loadMetricData bool) <-chan JobContainer { return ch } + +func (sa *SqliteArchive) StoreClusterCfg(name string, config *schema.Cluster) error { + var configBuf bytes.Buffer + if err := EncodeCluster(&configBuf, config); err != nil { + cclog.Error("SqliteArchive StoreClusterCfg() > encoding error") + return err + } + + now := time.Now().Unix() + _, err := sa.db.Exec(` + INSERT INTO clusters (name, config_json, updated_at) + VALUES (?, ?, ?) + ON CONFLICT(name) DO UPDATE SET + config_json = excluded.config_json, + updated_at = excluded.updated_at + `, name, configBuf.Bytes(), now) + if err != nil { + cclog.Errorf("SqliteArchive StoreClusterCfg() > insert error: %v", err) + return err + } + + // Update clusters list if new + found := slices.Contains(sa.clusters, name) + if !found { + sa.clusters = append(sa.clusters, name) + } + + return nil +}