Fix bugs in sqlite backend

This commit is contained in:
2025-12-15 11:23:53 +01:00
parent ab70acd582
commit 6cfed989ff

View File

@@ -14,6 +14,7 @@ import (
"io" "io"
"math" "math"
"os" "os"
"slices"
"strconv" "strconv"
"text/tabwriter" "text/tabwriter"
"time" "time"
@@ -251,6 +252,7 @@ func (sa *SqliteArchive) LoadJobData(job *schema.Job) (schema.JobData, error) {
cclog.Errorf("SqliteArchive LoadJobData() > query error: %v", err) cclog.Errorf("SqliteArchive LoadJobData() > query error: %v", err)
return nil, err return nil, err
} }
key := fmt.Sprintf("%s:%d:%d", job.Cluster, job.JobID, job.StartTime)
var reader io.Reader = bytes.NewReader(dataBlob) var reader io.Reader = bytes.NewReader(dataBlob)
if compressed { if compressed {
@@ -268,10 +270,10 @@ func (sa *SqliteArchive) LoadJobData(job *schema.Job) (schema.JobData, error) {
if err := schema.Validate(schema.Data, bytes.NewReader(data)); err != nil { if err := schema.Validate(schema.Data, bytes.NewReader(data)); err != nil {
return schema.JobData{}, fmt.Errorf("validate job data: %v", err) return schema.JobData{}, fmt.Errorf("validate job data: %v", err)
} }
return DecodeJobData(bytes.NewReader(data), "sqlite") return DecodeJobData(bytes.NewReader(data), key)
} }
return DecodeJobData(reader, "sqlite") return DecodeJobData(reader, key)
} }
func (sa *SqliteArchive) LoadJobStats(job *schema.Job) (schema.ScopedJobStats, error) { func (sa *SqliteArchive) LoadJobStats(job *schema.Job) (schema.ScopedJobStats, error) {
@@ -283,6 +285,7 @@ func (sa *SqliteArchive) LoadJobStats(job *schema.Job) (schema.ScopedJobStats, e
cclog.Errorf("SqliteArchive LoadJobStats() > query error: %v", err) cclog.Errorf("SqliteArchive LoadJobStats() > query error: %v", err)
return nil, err return nil, err
} }
key := fmt.Sprintf("%s:%d:%d", job.Cluster, job.JobID, job.StartTime)
var reader io.Reader = bytes.NewReader(dataBlob) var reader io.Reader = bytes.NewReader(dataBlob)
if compressed { if compressed {
@@ -300,10 +303,10 @@ func (sa *SqliteArchive) LoadJobStats(job *schema.Job) (schema.ScopedJobStats, e
if err := schema.Validate(schema.Data, bytes.NewReader(data)); err != nil { if err := schema.Validate(schema.Data, bytes.NewReader(data)); err != nil {
return nil, fmt.Errorf("validate job data: %v", err) return nil, fmt.Errorf("validate job data: %v", err)
} }
return DecodeJobStats(bytes.NewReader(data), "sqlite") return DecodeJobStats(bytes.NewReader(data), key)
} }
return DecodeJobStats(reader, "sqlite") return DecodeJobStats(reader, key)
} }
func (sa *SqliteArchive) LoadClusterCfg(name string) (*schema.Cluster, error) { func (sa *SqliteArchive) LoadClusterCfg(name string) (*schema.Cluster, error) {
@@ -314,9 +317,11 @@ func (sa *SqliteArchive) LoadClusterCfg(name string) (*schema.Cluster, error) {
return nil, err return nil, err
} }
if err := schema.Validate(schema.ClusterCfg, bytes.NewReader(configBlob)); err != nil { if config.Keys.Validate {
cclog.Warnf("Validate cluster config: %v\n", err) if err := schema.Validate(schema.ClusterCfg, bytes.NewReader(configBlob)); err != nil {
return &schema.Cluster{}, fmt.Errorf("validate cluster config: %v", err) cclog.Warnf("Validate cluster config: %v\n", err)
return &schema.Cluster{}, fmt.Errorf("validate cluster config: %v", err)
}
} }
return DecodeCluster(bytes.NewReader(configBlob)) return DecodeCluster(bytes.NewReader(configBlob))
@@ -337,7 +342,6 @@ func (sa *SqliteArchive) StoreJobMeta(job *schema.Job) error {
meta_json = excluded.meta_json, meta_json = excluded.meta_json,
updated_at = excluded.updated_at updated_at = excluded.updated_at
`, job.JobID, job.Cluster, job.StartTime, metaBuf.Bytes(), now, now) `, job.JobID, job.Cluster, job.StartTime, metaBuf.Bytes(), now, now)
if err != nil { if err != nil {
cclog.Errorf("SqliteArchive StoreJobMeta() > insert error: %v", err) cclog.Errorf("SqliteArchive StoreJobMeta() > insert error: %v", err)
return err return err
@@ -367,7 +371,6 @@ func (sa *SqliteArchive) ImportJob(jobMeta *schema.Job, jobData *schema.JobData)
data_compressed = excluded.data_compressed, data_compressed = excluded.data_compressed,
updated_at = excluded.updated_at updated_at = excluded.updated_at
`, jobMeta.JobID, jobMeta.Cluster, jobMeta.StartTime, metaBuf.Bytes(), dataBuf.Bytes(), now, now) `, jobMeta.JobID, jobMeta.Cluster, jobMeta.StartTime, metaBuf.Bytes(), dataBuf.Bytes(), now, now)
if err != nil { if err != nil {
cclog.Errorf("SqliteArchive ImportJob() > insert error: %v", err) cclog.Errorf("SqliteArchive ImportJob() > insert error: %v", err)
return err return err
@@ -567,7 +570,8 @@ func (sa *SqliteArchive) Iter(loadMetricData bool) <-chan JobContainer {
reader = gzipReader reader = gzipReader
} }
jobData, err := DecodeJobData(reader, "sqlite") key := fmt.Sprintf("%s:%d:%d", job.Cluster, job.JobID, job.StartTime)
jobData, err := DecodeJobData(reader, key)
if err != nil { if err != nil {
cclog.Errorf("SqliteArchive Iter() > decode data error: %v", err) cclog.Errorf("SqliteArchive Iter() > decode data error: %v", err)
ch <- JobContainer{Meta: job, Data: nil} ch <- JobContainer{Meta: job, Data: nil}
@@ -582,3 +586,32 @@ func (sa *SqliteArchive) Iter(loadMetricData bool) <-chan JobContainer {
return ch return ch
} }
func (sa *SqliteArchive) StoreClusterCfg(name string, config *schema.Cluster) error {
var configBuf bytes.Buffer
if err := EncodeCluster(&configBuf, config); err != nil {
cclog.Error("SqliteArchive StoreClusterCfg() > encoding error")
return err
}
now := time.Now().Unix()
_, err := sa.db.Exec(`
INSERT INTO clusters (name, config_json, updated_at)
VALUES (?, ?, ?)
ON CONFLICT(name) DO UPDATE SET
config_json = excluded.config_json,
updated_at = excluded.updated_at
`, name, configBuf.Bytes(), now)
if err != nil {
cclog.Errorf("SqliteArchive StoreClusterCfg() > insert error: %v", err)
return err
}
// Update clusters list if new
found := slices.Contains(sa.clusters, name)
if !found {
sa.clusters = append(sa.clusters, name)
}
return nil
}