From b8fdfc30c06766651a39774fbc9685fb64b5c71f Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Wed, 17 Dec 2025 10:12:49 +0100 Subject: [PATCH] Fix performance bugs in sqlite archive backend --- internal/importer/initDB.go | 14 ++++---- pkg/archive/sqliteBackend.go | 52 ++++++++++++++++++++++------ pkg/archive/sqliteBackend_test.go | 57 ++++++++++++++++++++++++++++++- 3 files changed, 105 insertions(+), 18 deletions(-) diff --git a/internal/importer/initDB.go b/internal/importer/initDB.go index a478957..12f4901 100644 --- a/internal/importer/initDB.go +++ b/internal/importer/initDB.go @@ -111,18 +111,22 @@ func InitDB() error { continue } - id, err := r.TransactionAddNamed(t, + id, jobErr := r.TransactionAddNamed(t, repository.NamedJobInsert, jobMeta) - if err != nil { - cclog.Errorf("repository initDB(): %v", err) + if jobErr != nil { + cclog.Errorf("repository initDB(): %v", jobErr) errorOccured++ continue } + // Job successfully inserted, increment counter + i += 1 + for _, tag := range jobMeta.Tags { tagstr := tag.Name + ":" + tag.Type tagID, ok := tags[tagstr] if !ok { + var err error tagID, err = r.TransactionAdd(t, addTagQuery, tag.Name, tag.Type) @@ -138,10 +142,6 @@ func InitDB() error { setTagQuery, id, tagID) } - - if err == nil { - i += 1 - } } if errorOccured > 0 { diff --git a/pkg/archive/sqliteBackend.go b/pkg/archive/sqliteBackend.go index 6fa188b..0b7a22d 100644 --- a/pkg/archive/sqliteBackend.go +++ b/pkg/archive/sqliteBackend.go @@ -61,6 +61,7 @@ CREATE TABLE IF NOT EXISTS jobs ( CREATE INDEX IF NOT EXISTS idx_jobs_cluster ON jobs(cluster); CREATE INDEX IF NOT EXISTS idx_jobs_start_time ON jobs(start_time); +CREATE INDEX IF NOT EXISTS idx_jobs_order ON jobs(cluster, start_time); CREATE INDEX IF NOT EXISTS idx_jobs_lookup ON jobs(cluster, job_id, start_time); CREATE TABLE IF NOT EXISTS clusters ( @@ -560,11 +561,15 @@ func (sa *SqliteArchive) Iter(loadMetricData bool) <-chan JobContainer { go func() { defer close(ch) - rows, err := sa.db.Query("SELECT meta_json, data_json, data_compressed FROM jobs ORDER BY cluster, start_time") - if err != nil { - cclog.Fatalf("SqliteArchive Iter() > query error: %s", err.Error()) + const chunkSize = 1000 + offset := 0 + + var query string + if loadMetricData { + query = "SELECT meta_json, data_json, data_compressed FROM jobs ORDER BY cluster, start_time LIMIT ? OFFSET ?" + } else { + query = "SELECT meta_json FROM jobs ORDER BY cluster, start_time LIMIT ? OFFSET ?" } - defer rows.Close() numWorkers := 4 jobRows := make(chan sqliteJobRow, numWorkers*2) @@ -615,13 +620,40 @@ func (sa *SqliteArchive) Iter(loadMetricData bool) <-chan JobContainer { }() } - for rows.Next() { - var row sqliteJobRow - if err := rows.Scan(&row.metaBlob, &row.dataBlob, &row.compressed); err != nil { - cclog.Errorf("SqliteArchive Iter() > scan error: %v", err) - continue + for { + rows, err := sa.db.Query(query, chunkSize, offset) + if err != nil { + cclog.Fatalf("SqliteArchive Iter() > query error: %s", err.Error()) } - jobRows <- row + + rowCount := 0 + for rows.Next() { + var row sqliteJobRow + + if loadMetricData { + if err := rows.Scan(&row.metaBlob, &row.dataBlob, &row.compressed); err != nil { + cclog.Errorf("SqliteArchive Iter() > scan error: %v", err) + continue + } + } else { + if err := rows.Scan(&row.metaBlob); err != nil { + cclog.Errorf("SqliteArchive Iter() > scan error: %v", err) + continue + } + row.dataBlob = nil + row.compressed = false + } + + jobRows <- row + rowCount++ + } + rows.Close() + + if rowCount < chunkSize { + break + } + + offset += chunkSize } close(jobRows) diff --git a/pkg/archive/sqliteBackend_test.go b/pkg/archive/sqliteBackend_test.go index 285055f..b72b8f6 100644 --- a/pkg/archive/sqliteBackend_test.go +++ b/pkg/archive/sqliteBackend_test.go @@ -294,7 +294,7 @@ func TestSqliteCompress(t *testing.T) { // Compress should not panic even with missing data sa.Compress([]*schema.Job{job}) - + t.Log("Compression method verified") } @@ -311,3 +311,58 @@ func TestSqliteConfigParsing(t *testing.T) { t.Errorf("expected dbPath '/tmp/test.db', got '%s'", cfg.DBPath) } } + +func TestSqliteIterChunking(t *testing.T) { + tmpfile := t.TempDir() + "/test.db" + defer os.Remove(tmpfile) + + var sa SqliteArchive + _, err := sa.Init(json.RawMessage(`{"dbPath":"` + tmpfile + `"}`)) + if err != nil { + t.Fatalf("init failed: %v", err) + } + defer sa.db.Close() + + const totalJobs = 2500 + for i := 1; i <= totalJobs; i++ { + job := &schema.Job{ + JobID: int64(i), + Cluster: "test", + StartTime: int64(i * 1000), + NumNodes: 1, + Resources: []*schema.Resource{{Hostname: "node001"}}, + } + if err := sa.StoreJobMeta(job); err != nil { + t.Fatalf("store failed: %v", err) + } + } + + t.Run("IterWithoutData", func(t *testing.T) { + count := 0 + for container := range sa.Iter(false) { + if container.Meta == nil { + t.Error("expected non-nil meta") + } + if container.Data != nil { + t.Error("expected nil data when loadMetricData is false") + } + count++ + } + if count != totalJobs { + t.Errorf("expected %d jobs, got %d", totalJobs, count) + } + }) + + t.Run("IterWithData", func(t *testing.T) { + count := 0 + for container := range sa.Iter(true) { + if container.Meta == nil { + t.Error("expected non-nil meta") + } + count++ + } + if count != totalJobs { + t.Errorf("expected %d jobs, got %d", totalJobs, count) + } + }) +}