mirror of
https://github.com/ClusterCockpit/cc-backend
synced 2025-12-19 05:36:17 +01:00
Fix performance bugs in sqlite archive backend
This commit is contained in:
@@ -111,18 +111,22 @@ func InitDB() error {
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
id, err := r.TransactionAddNamed(t,
|
id, jobErr := r.TransactionAddNamed(t,
|
||||||
repository.NamedJobInsert, jobMeta)
|
repository.NamedJobInsert, jobMeta)
|
||||||
if err != nil {
|
if jobErr != nil {
|
||||||
cclog.Errorf("repository initDB(): %v", err)
|
cclog.Errorf("repository initDB(): %v", jobErr)
|
||||||
errorOccured++
|
errorOccured++
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Job successfully inserted, increment counter
|
||||||
|
i += 1
|
||||||
|
|
||||||
for _, tag := range jobMeta.Tags {
|
for _, tag := range jobMeta.Tags {
|
||||||
tagstr := tag.Name + ":" + tag.Type
|
tagstr := tag.Name + ":" + tag.Type
|
||||||
tagID, ok := tags[tagstr]
|
tagID, ok := tags[tagstr]
|
||||||
if !ok {
|
if !ok {
|
||||||
|
var err error
|
||||||
tagID, err = r.TransactionAdd(t,
|
tagID, err = r.TransactionAdd(t,
|
||||||
addTagQuery,
|
addTagQuery,
|
||||||
tag.Name, tag.Type)
|
tag.Name, tag.Type)
|
||||||
@@ -138,10 +142,6 @@ func InitDB() error {
|
|||||||
setTagQuery,
|
setTagQuery,
|
||||||
id, tagID)
|
id, tagID)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err == nil {
|
|
||||||
i += 1
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if errorOccured > 0 {
|
if errorOccured > 0 {
|
||||||
|
|||||||
@@ -61,6 +61,7 @@ CREATE TABLE IF NOT EXISTS jobs (
|
|||||||
|
|
||||||
CREATE INDEX IF NOT EXISTS idx_jobs_cluster ON jobs(cluster);
|
CREATE INDEX IF NOT EXISTS idx_jobs_cluster ON jobs(cluster);
|
||||||
CREATE INDEX IF NOT EXISTS idx_jobs_start_time ON jobs(start_time);
|
CREATE INDEX IF NOT EXISTS idx_jobs_start_time ON jobs(start_time);
|
||||||
|
CREATE INDEX IF NOT EXISTS idx_jobs_order ON jobs(cluster, start_time);
|
||||||
CREATE INDEX IF NOT EXISTS idx_jobs_lookup ON jobs(cluster, job_id, start_time);
|
CREATE INDEX IF NOT EXISTS idx_jobs_lookup ON jobs(cluster, job_id, start_time);
|
||||||
|
|
||||||
CREATE TABLE IF NOT EXISTS clusters (
|
CREATE TABLE IF NOT EXISTS clusters (
|
||||||
@@ -560,11 +561,15 @@ func (sa *SqliteArchive) Iter(loadMetricData bool) <-chan JobContainer {
|
|||||||
go func() {
|
go func() {
|
||||||
defer close(ch)
|
defer close(ch)
|
||||||
|
|
||||||
rows, err := sa.db.Query("SELECT meta_json, data_json, data_compressed FROM jobs ORDER BY cluster, start_time")
|
const chunkSize = 1000
|
||||||
if err != nil {
|
offset := 0
|
||||||
cclog.Fatalf("SqliteArchive Iter() > query error: %s", err.Error())
|
|
||||||
|
var query string
|
||||||
|
if loadMetricData {
|
||||||
|
query = "SELECT meta_json, data_json, data_compressed FROM jobs ORDER BY cluster, start_time LIMIT ? OFFSET ?"
|
||||||
|
} else {
|
||||||
|
query = "SELECT meta_json FROM jobs ORDER BY cluster, start_time LIMIT ? OFFSET ?"
|
||||||
}
|
}
|
||||||
defer rows.Close()
|
|
||||||
|
|
||||||
numWorkers := 4
|
numWorkers := 4
|
||||||
jobRows := make(chan sqliteJobRow, numWorkers*2)
|
jobRows := make(chan sqliteJobRow, numWorkers*2)
|
||||||
@@ -615,13 +620,40 @@ func (sa *SqliteArchive) Iter(loadMetricData bool) <-chan JobContainer {
|
|||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
for rows.Next() {
|
for {
|
||||||
var row sqliteJobRow
|
rows, err := sa.db.Query(query, chunkSize, offset)
|
||||||
if err := rows.Scan(&row.metaBlob, &row.dataBlob, &row.compressed); err != nil {
|
if err != nil {
|
||||||
cclog.Errorf("SqliteArchive Iter() > scan error: %v", err)
|
cclog.Fatalf("SqliteArchive Iter() > query error: %s", err.Error())
|
||||||
continue
|
|
||||||
}
|
}
|
||||||
jobRows <- row
|
|
||||||
|
rowCount := 0
|
||||||
|
for rows.Next() {
|
||||||
|
var row sqliteJobRow
|
||||||
|
|
||||||
|
if loadMetricData {
|
||||||
|
if err := rows.Scan(&row.metaBlob, &row.dataBlob, &row.compressed); err != nil {
|
||||||
|
cclog.Errorf("SqliteArchive Iter() > scan error: %v", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if err := rows.Scan(&row.metaBlob); err != nil {
|
||||||
|
cclog.Errorf("SqliteArchive Iter() > scan error: %v", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
row.dataBlob = nil
|
||||||
|
row.compressed = false
|
||||||
|
}
|
||||||
|
|
||||||
|
jobRows <- row
|
||||||
|
rowCount++
|
||||||
|
}
|
||||||
|
rows.Close()
|
||||||
|
|
||||||
|
if rowCount < chunkSize {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
offset += chunkSize
|
||||||
}
|
}
|
||||||
|
|
||||||
close(jobRows)
|
close(jobRows)
|
||||||
|
|||||||
@@ -294,7 +294,7 @@ func TestSqliteCompress(t *testing.T) {
|
|||||||
|
|
||||||
// Compress should not panic even with missing data
|
// Compress should not panic even with missing data
|
||||||
sa.Compress([]*schema.Job{job})
|
sa.Compress([]*schema.Job{job})
|
||||||
|
|
||||||
t.Log("Compression method verified")
|
t.Log("Compression method verified")
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -311,3 +311,58 @@ func TestSqliteConfigParsing(t *testing.T) {
|
|||||||
t.Errorf("expected dbPath '/tmp/test.db', got '%s'", cfg.DBPath)
|
t.Errorf("expected dbPath '/tmp/test.db', got '%s'", cfg.DBPath)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestSqliteIterChunking(t *testing.T) {
|
||||||
|
tmpfile := t.TempDir() + "/test.db"
|
||||||
|
defer os.Remove(tmpfile)
|
||||||
|
|
||||||
|
var sa SqliteArchive
|
||||||
|
_, err := sa.Init(json.RawMessage(`{"dbPath":"` + tmpfile + `"}`))
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("init failed: %v", err)
|
||||||
|
}
|
||||||
|
defer sa.db.Close()
|
||||||
|
|
||||||
|
const totalJobs = 2500
|
||||||
|
for i := 1; i <= totalJobs; i++ {
|
||||||
|
job := &schema.Job{
|
||||||
|
JobID: int64(i),
|
||||||
|
Cluster: "test",
|
||||||
|
StartTime: int64(i * 1000),
|
||||||
|
NumNodes: 1,
|
||||||
|
Resources: []*schema.Resource{{Hostname: "node001"}},
|
||||||
|
}
|
||||||
|
if err := sa.StoreJobMeta(job); err != nil {
|
||||||
|
t.Fatalf("store failed: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
t.Run("IterWithoutData", func(t *testing.T) {
|
||||||
|
count := 0
|
||||||
|
for container := range sa.Iter(false) {
|
||||||
|
if container.Meta == nil {
|
||||||
|
t.Error("expected non-nil meta")
|
||||||
|
}
|
||||||
|
if container.Data != nil {
|
||||||
|
t.Error("expected nil data when loadMetricData is false")
|
||||||
|
}
|
||||||
|
count++
|
||||||
|
}
|
||||||
|
if count != totalJobs {
|
||||||
|
t.Errorf("expected %d jobs, got %d", totalJobs, count)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("IterWithData", func(t *testing.T) {
|
||||||
|
count := 0
|
||||||
|
for container := range sa.Iter(true) {
|
||||||
|
if container.Meta == nil {
|
||||||
|
t.Error("expected non-nil meta")
|
||||||
|
}
|
||||||
|
count++
|
||||||
|
}
|
||||||
|
if count != totalJobs {
|
||||||
|
t.Errorf("expected %d jobs, got %d", totalJobs, count)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user