This commit is contained in:
Christoph Kluge
2026-01-15 10:12:02 +01:00
32 changed files with 5483 additions and 574 deletions

View File

@@ -80,18 +80,33 @@ import (
)
var (
jobRepoOnce sync.Once
// jobRepoOnce ensures singleton initialization of the JobRepository
jobRepoOnce sync.Once
// jobRepoInstance holds the single instance of JobRepository
jobRepoInstance *JobRepository
)
// JobRepository provides database access for job-related operations.
// It implements the repository pattern to abstract database interactions
// and provides caching for improved performance.
//
// The repository is a singleton initialized via GetJobRepository().
// All database queries use prepared statements via stmtCache for efficiency.
// Frequently accessed data (metadata, energy footprints) is cached in an LRU cache.
type JobRepository struct {
DB *sqlx.DB
stmtCache *sq.StmtCache
cache *lrucache.Cache
driver string
Mutex sync.Mutex
DB *sqlx.DB // Database connection pool
stmtCache *sq.StmtCache // Prepared statement cache for query optimization
cache *lrucache.Cache // LRU cache for metadata and footprint data
driver string // Database driver name (e.g., "sqlite3")
Mutex sync.Mutex // Mutex for thread-safe operations
}
// GetJobRepository returns the singleton instance of JobRepository.
// The repository is initialized lazily on first access with database connection,
// prepared statement cache, and LRU cache configured from repoConfig.
//
// This function is thread-safe and ensures only one instance is created.
// It must be called after Connect() has established a database connection.
func GetJobRepository() *JobRepository {
jobRepoOnce.Do(func() {
db := GetConnection()
@@ -107,6 +122,8 @@ func GetJobRepository() *JobRepository {
return jobRepoInstance
}
// jobColumns defines the standard set of columns selected from the job table.
// Used consistently across all job queries to ensure uniform data retrieval.
var jobColumns []string = []string{
"job.id", "job.job_id", "job.hpc_user", "job.project", "job.cluster", "job.subcluster",
"job.start_time", "job.cluster_partition", "job.array_job_id", "job.num_nodes",
@@ -115,6 +132,8 @@ var jobColumns []string = []string{
"job.footprint", "job.energy",
}
// jobCacheColumns defines columns from the job_cache table, mirroring jobColumns.
// Used for queries against cached job data for performance optimization.
var jobCacheColumns []string = []string{
"job_cache.id", "job_cache.job_id", "job_cache.hpc_user", "job_cache.project", "job_cache.cluster",
"job_cache.subcluster", "job_cache.start_time", "job_cache.cluster_partition",
@@ -124,6 +143,14 @@ var jobCacheColumns []string = []string{
"job_cache.footprint", "job_cache.energy",
}
// scanJob converts a database row into a schema.Job struct.
// It handles JSON unmarshaling of resources and footprint fields,
// and calculates accurate duration for running jobs.
//
// Parameters:
// - row: Database row implementing Scan() interface (sql.Row or sql.Rows)
//
// Returns the populated Job struct or an error if scanning or unmarshaling fails.
func scanJob(row interface{ Scan(...any) error }) (*schema.Job, error) {
job := &schema.Job{}
@@ -186,6 +213,16 @@ func (r *JobRepository) Flush() error {
return nil
}
// FetchMetadata retrieves and unmarshals the metadata JSON for a job.
// Metadata is cached with a 24-hour TTL to improve performance.
//
// The metadata field stores arbitrary key-value pairs associated with a job,
// such as tags, labels, or custom attributes added by external systems.
//
// Parameters:
// - job: Job struct with valid ID field, metadata will be populated in job.MetaData
//
// Returns the metadata map or an error if the job is nil or database query fails.
func (r *JobRepository) FetchMetadata(job *schema.Job) (map[string]string, error) {
if job == nil {
return nil, fmt.Errorf("job cannot be nil")
@@ -218,6 +255,16 @@ func (r *JobRepository) FetchMetadata(job *schema.Job) (map[string]string, error
return job.MetaData, nil
}
// UpdateMetadata adds or updates a single metadata key-value pair for a job.
// The entire metadata map is re-marshaled and stored, and the cache is invalidated.
// Also triggers archive metadata update via archive.UpdateMetadata.
//
// Parameters:
// - job: Job struct with valid ID, existing metadata will be fetched if not present
// - key: Metadata key to set
// - val: Metadata value to set
//
// Returns an error if the job is nil, metadata fetch fails, or database update fails.
func (r *JobRepository) UpdateMetadata(job *schema.Job, key, val string) (err error) {
if job == nil {
return fmt.Errorf("job cannot be nil")
@@ -228,7 +275,7 @@ func (r *JobRepository) UpdateMetadata(job *schema.Job, key, val string) (err er
if job.MetaData == nil {
if _, err = r.FetchMetadata(job); err != nil {
cclog.Warnf("Error while fetching metadata for job, DB ID '%v'", job.ID)
return err
return fmt.Errorf("failed to fetch metadata for job %d: %w", job.ID, err)
}
}
@@ -243,7 +290,7 @@ func (r *JobRepository) UpdateMetadata(job *schema.Job, key, val string) (err er
if job.RawMetaData, err = json.Marshal(job.MetaData); err != nil {
cclog.Warnf("Error while marshaling metadata for job, DB ID '%v'", job.ID)
return err
return fmt.Errorf("failed to marshal metadata for job %d: %w", job.ID, err)
}
if _, err = sq.Update("job").
@@ -251,13 +298,23 @@ func (r *JobRepository) UpdateMetadata(job *schema.Job, key, val string) (err er
Where("job.id = ?", job.ID).
RunWith(r.stmtCache).Exec(); err != nil {
cclog.Warnf("Error while updating metadata for job, DB ID '%v'", job.ID)
return err
return fmt.Errorf("failed to update metadata in database for job %d: %w", job.ID, err)
}
r.cache.Put(cachekey, job.MetaData, len(job.RawMetaData), 24*time.Hour)
return archive.UpdateMetadata(job, job.MetaData)
}
// FetchFootprint retrieves and unmarshals the performance footprint JSON for a job.
// Unlike FetchMetadata, footprints are NOT cached as they can be large and change frequently.
//
// The footprint contains summary statistics (avg/min/max) for monitored metrics,
// stored as JSON with keys like "cpu_load_avg", "mem_used_max", etc.
//
// Parameters:
// - job: Job struct with valid ID, footprint will be populated in job.Footprint
//
// Returns the footprint map or an error if the job is nil or database query fails.
func (r *JobRepository) FetchFootprint(job *schema.Job) (map[string]float64, error) {
if job == nil {
return nil, fmt.Errorf("job cannot be nil")
@@ -284,6 +341,16 @@ func (r *JobRepository) FetchFootprint(job *schema.Job) (map[string]float64, err
return job.Footprint, nil
}
// FetchEnergyFootprint retrieves and unmarshals the energy footprint JSON for a job.
// Energy footprints are cached with a 24-hour TTL as they are frequently accessed but rarely change.
//
// The energy footprint contains calculated energy consumption (in kWh) per metric,
// stored as JSON with keys like "power_avg", "acc_power_avg", etc.
//
// Parameters:
// - job: Job struct with valid ID, energy footprint will be populated in job.EnergyFootprint
//
// Returns the energy footprint map or an error if the job is nil or database query fails.
func (r *JobRepository) FetchEnergyFootprint(job *schema.Job) (map[string]float64, error) {
if job == nil {
return nil, fmt.Errorf("job cannot be nil")
@@ -316,6 +383,18 @@ func (r *JobRepository) FetchEnergyFootprint(job *schema.Job) (map[string]float6
return job.EnergyFootprint, nil
}
// DeleteJobsBefore removes jobs older than the specified start time.
// Optionally preserves tagged jobs to protect important data from deletion.
// Cache entries for deleted jobs are automatically invalidated.
//
// This is typically used for data retention policies and cleanup operations.
// WARNING: This is a destructive operation that permanently deletes job records.
//
// Parameters:
// - startTime: Unix timestamp, jobs with start_time < this value will be deleted
// - omitTagged: If true, skip jobs that have associated tags (jobtag entries)
//
// Returns the count of deleted jobs or an error if the operation fails.
func (r *JobRepository) DeleteJobsBefore(startTime int64, omitTagged bool) (int, error) {
var cnt int
q := sq.Select("count(*)").From("job").Where("job.start_time < ?", startTime)
@@ -371,6 +450,13 @@ func (r *JobRepository) DeleteJobsBefore(startTime int64, omitTagged bool) (int,
return cnt, err
}
// DeleteJobByID permanently removes a single job by its database ID.
// Cache entries for the deleted job are automatically invalidated.
//
// Parameters:
// - id: Database ID (primary key) of the job to delete
//
// Returns an error if the deletion fails.
func (r *JobRepository) DeleteJobByID(id int64) error {
// Invalidate cache entries before deletion
r.cache.Del(fmt.Sprintf("metadata:%d", id))
@@ -388,6 +474,24 @@ func (r *JobRepository) DeleteJobByID(id int64) error {
return err
}
// FindUserOrProjectOrJobname attempts to interpret a search term as a job ID,
// username, project ID, or job name by querying the database.
//
// Search logic (in priority order):
// 1. If searchterm is numeric, treat as job ID (returned immediately)
// 2. Try exact match in job.hpc_user column (username)
// 3. Try LIKE match in hpc_user.name column (real name)
// 4. Try exact match in job.project column (project ID)
// 5. If no matches, return searchterm as jobname for GraphQL query
//
// This powers the searchbar functionality for flexible job searching.
// Requires authenticated user for database lookups (returns empty if user is nil).
//
// Parameters:
// - user: Authenticated user context, required for database access
// - searchterm: Search string to interpret
//
// Returns up to one non-empty value among (jobid, username, project, jobname).
func (r *JobRepository) FindUserOrProjectOrJobname(user *schema.User, searchterm string) (jobid string, username string, project string, jobname string) {
if searchterm == "" {
return "", "", "", ""
@@ -423,6 +527,19 @@ var (
ErrForbidden = errors.New("not authorized")
)
// FindColumnValue performs a generic column lookup in a database table with role-based access control.
// Only users with admin, support, or manager roles can execute this query.
//
// Parameters:
// - user: User context for authorization check
// - searchterm: Value to search for (exact match or LIKE pattern)
// - table: Database table name to query
// - selectColumn: Column name to return in results
// - whereColumn: Column name to filter on
// - isLike: If true, use LIKE with wildcards; if false, use exact equality
//
// Returns the first matching value, ErrForbidden if user lacks permission,
// or ErrNotFound if no matches are found.
func (r *JobRepository) FindColumnValue(user *schema.User, searchterm string, table string, selectColumn string, whereColumn string, isLike bool) (result string, err error) {
if user == nil {
return "", fmt.Errorf("user cannot be nil")
@@ -453,6 +570,19 @@ func (r *JobRepository) FindColumnValue(user *schema.User, searchterm string, ta
}
}
// FindColumnValues performs a generic column lookup returning multiple matches with role-based access control.
// Similar to FindColumnValue but returns all matching values instead of just the first.
// Only users with admin, support, or manager roles can execute this query.
//
// Parameters:
// - user: User context for authorization check
// - query: Search pattern (always uses LIKE with wildcards)
// - table: Database table name to query
// - selectColumn: Column name to return in results
// - whereColumn: Column name to filter on
//
// Returns a slice of matching values, ErrForbidden if user lacks permission,
// or ErrNotFound if no matches are found.
func (r *JobRepository) FindColumnValues(user *schema.User, query string, table string, selectColumn string, whereColumn string) (results []string, err error) {
if user == nil {
return nil, fmt.Errorf("user cannot be nil")
@@ -487,6 +617,13 @@ func (r *JobRepository) FindColumnValues(user *schema.User, query string, table
}
}
// Partitions returns a list of distinct cluster partitions for a given cluster.
// Results are cached with a 1-hour TTL to improve performance.
//
// Parameters:
// - cluster: Cluster name to query partitions for
//
// Returns a slice of partition names or an error if the database query fails.
func (r *JobRepository) Partitions(cluster string) ([]string, error) {
var err error
start := time.Now()
@@ -549,7 +686,19 @@ func (r *JobRepository) AllocatedNodes(cluster string) (map[string]map[string]in
return subclusters, nil
}
// FIXME: Set duration to requested walltime?
// StopJobsExceedingWalltimeBy marks running jobs as failed if they exceed their walltime limit.
// This is typically called periodically to clean up stuck or orphaned jobs.
//
// Jobs are marked with:
// - monitoring_status: MonitoringStatusArchivingFailed
// - duration: 0
// - job_state: JobStateFailed
//
// Parameters:
// - seconds: Grace period beyond walltime before marking as failed
//
// Returns an error if the database update fails.
// Logs the number of jobs marked as failed if any were affected.
func (r *JobRepository) StopJobsExceedingWalltimeBy(seconds int) error {
start := time.Now()
currentTime := time.Now().Unix()
@@ -579,6 +728,12 @@ func (r *JobRepository) StopJobsExceedingWalltimeBy(seconds int) error {
return nil
}
// FindJobIdsByTag returns all job database IDs associated with a specific tag.
//
// Parameters:
// - tagID: Database ID of the tag to search for
//
// Returns a slice of job IDs or an error if the query fails.
func (r *JobRepository) FindJobIdsByTag(tagID int64) ([]int64, error) {
query := sq.Select("job.id").From("job").
Join("jobtag ON jobtag.job_id = job.id").
@@ -606,7 +761,13 @@ func (r *JobRepository) FindJobIdsByTag(tagID int64) ([]int64, error) {
return jobIds, nil
}
// FIXME: Reconsider filtering short jobs with harcoded threshold
// FindRunningJobs returns all currently running jobs for a specific cluster.
// Filters out short-running jobs based on repoConfig.MinRunningJobDuration threshold.
//
// Parameters:
// - cluster: Cluster name to filter jobs
//
// Returns a slice of running job objects or an error if the query fails.
func (r *JobRepository) FindRunningJobs(cluster string) ([]*schema.Job, error) {
query := sq.Select(jobColumns...).From("job").
Where("job.cluster = ?", cluster).
@@ -634,6 +795,12 @@ func (r *JobRepository) FindRunningJobs(cluster string) ([]*schema.Job, error) {
return jobs, nil
}
// UpdateDuration recalculates and updates the duration field for all running jobs.
// Called periodically to keep job durations current without querying individual jobs.
//
// Duration is calculated as: current_time - job.start_time
//
// Returns an error if the database update fails.
func (r *JobRepository) UpdateDuration() error {
stmnt := sq.Update("job").
Set("duration", sq.Expr("? - job.start_time", time.Now().Unix())).
@@ -648,6 +815,16 @@ func (r *JobRepository) UpdateDuration() error {
return nil
}
// FindJobsBetween returns jobs within a specified time range.
// If startTimeBegin is 0, returns all jobs before startTimeEnd.
// Optionally excludes tagged jobs from results.
//
// Parameters:
// - startTimeBegin: Unix timestamp for range start (use 0 for unbounded start)
// - startTimeEnd: Unix timestamp for range end
// - omitTagged: If true, exclude jobs with associated tags
//
// Returns a slice of jobs or an error if the time range is invalid or query fails.
func (r *JobRepository) FindJobsBetween(startTimeBegin int64, startTimeEnd int64, omitTagged bool) ([]*schema.Job, error) {
var query sq.SelectBuilder
@@ -688,6 +865,14 @@ func (r *JobRepository) FindJobsBetween(startTimeBegin int64, startTimeEnd int64
return jobs, nil
}
// UpdateMonitoringStatus updates the monitoring status for a job and invalidates its cache entries.
// Cache invalidation affects both metadata and energy footprint to ensure consistency.
//
// Parameters:
// - job: Database ID of the job to update
// - monitoringStatus: New monitoring status value (see schema.MonitoringStatus constants)
//
// Returns an error if the database update fails.
func (r *JobRepository) UpdateMonitoringStatus(job int64, monitoringStatus int32) (err error) {
// Invalidate cache entries as monitoring status affects job state
r.cache.Del(fmt.Sprintf("metadata:%d", job))
@@ -704,6 +889,13 @@ func (r *JobRepository) UpdateMonitoringStatus(job int64, monitoringStatus int32
return nil
}
// Execute runs a Squirrel UpdateBuilder statement against the database.
// This is a generic helper for executing pre-built update queries.
//
// Parameters:
// - stmt: Squirrel UpdateBuilder with prepared update query
//
// Returns an error if the execution fails.
func (r *JobRepository) Execute(stmt sq.UpdateBuilder) error {
if _, err := stmt.RunWith(r.stmtCache).Exec(); err != nil {
cclog.Errorf("Error while executing statement: %v", err)
@@ -713,6 +905,14 @@ func (r *JobRepository) Execute(stmt sq.UpdateBuilder) error {
return nil
}
// MarkArchived adds monitoring status update to an existing UpdateBuilder statement.
// This is a builder helper used when constructing multi-field update queries.
//
// Parameters:
// - stmt: Existing UpdateBuilder to modify
// - monitoringStatus: Monitoring status value to set
//
// Returns the modified UpdateBuilder for method chaining.
func (r *JobRepository) MarkArchived(
stmt sq.UpdateBuilder,
monitoringStatus int32,
@@ -720,11 +920,22 @@ func (r *JobRepository) MarkArchived(
return stmt.Set("monitoring_status", monitoringStatus)
}
// UpdateEnergy calculates and updates the energy consumption for a job.
// This is called for running jobs during intermediate updates or when archiving.
//
// Energy calculation formula:
// - For "power" metrics: Energy (kWh) = (Power_avg * NumNodes * Duration_hours) / 1000
// - For "energy" metrics: Currently not implemented (would need sum statistics)
//
// The calculation accounts for:
// - Multi-node jobs: Multiplies by NumNodes to get total cluster energy
// - Shared jobs: Node average is already based on partial resources, so NumNodes=1
// - Unit conversion: Watts * hours / 1000 = kilowatt-hours (kWh)
// - Rounding: Results rounded to 2 decimal places
func (r *JobRepository) UpdateEnergy(
stmt sq.UpdateBuilder,
jobMeta *schema.Job,
) (sq.UpdateBuilder, error) {
/* Note: Only Called for Running Jobs during Intermediate Update or on Archiving */
sc, err := archive.GetSubCluster(jobMeta.Cluster, jobMeta.SubCluster)
if err != nil {
cclog.Errorf("cannot get subcluster: %s", err.Error())
@@ -732,25 +943,27 @@ func (r *JobRepository) UpdateEnergy(
}
energyFootprint := make(map[string]float64)
// Total Job Energy Outside Loop
// Accumulate total energy across all energy-related metrics
totalEnergy := 0.0
for _, fp := range sc.EnergyFootprint {
// Always Init Metric Energy Inside Loop
// Calculate energy for this specific metric
metricEnergy := 0.0
if i, err := archive.MetricIndex(sc.MetricConfig, fp); err == nil {
// Note: For DB data, calculate and save as kWh
switch sc.MetricConfig[i].Energy {
case "energy": // this metric has energy as unit (Joules or Wh)
case "energy": // Metric already in energy units (Joules or Wh)
cclog.Warnf("Update EnergyFootprint for Job %d and Metric %s on cluster %s: Set to 'energy' in cluster.json: Not implemented, will return 0.0", jobMeta.JobID, jobMeta.Cluster, fp)
// FIXME: Needs sum as stats type
case "power": // this metric has power as unit (Watt)
// Energy: Power (in Watts) * Time (in Seconds)
// Unit: (W * (s / 3600)) / 1000 = kWh
// Round 2 Digits: round(Energy * 100) / 100
// Here: (All-Node Metric Average * Number of Nodes) * (Job Duration in Seconds / 3600) / 1000
// Note: Shared Jobs handled correctly since "Node Average" is based on partial resources, while "numNodes" factor is 1
// FIXME: Needs sum as stats type to accumulate energy values over time
case "power": // Metric in power units (Watts)
// Energy (kWh) = Power (W) × Time (h) / 1000
// Formula: (avg_power_per_node * num_nodes) * (duration_sec / 3600) / 1000
//
// Breakdown:
// LoadJobStat(jobMeta, fp, "avg") = average power per node (W)
// jobMeta.NumNodes = number of nodes (1 for shared jobs)
// jobMeta.Duration / 3600.0 = duration in hours
// / 1000.0 = convert Wh to kWh
rawEnergy := ((LoadJobStat(jobMeta, fp, "avg") * float64(jobMeta.NumNodes)) * (float64(jobMeta.Duration) / 3600.0)) / 1000.0
metricEnergy = math.Round(rawEnergy*100.0) / 100.0
metricEnergy = math.Round(rawEnergy*100.0) / 100.0 // Round to 2 decimal places
}
} else {
cclog.Warnf("Error while collecting energy metric %s for job, DB ID '%v', return '0.0'", fp, jobMeta.ID)
@@ -758,8 +971,6 @@ func (r *JobRepository) UpdateEnergy(
energyFootprint[fp] = metricEnergy
totalEnergy += metricEnergy
// cclog.Infof("Metric %s Average %f -> %f kWh | Job %d Total -> %f kWh", fp, LoadJobStat(jobMeta, fp, "avg"), energy, jobMeta.JobID, totalEnergy)
}
var rawFootprint []byte
@@ -771,11 +982,19 @@ func (r *JobRepository) UpdateEnergy(
return stmt.Set("energy_footprint", string(rawFootprint)).Set("energy", (math.Round(totalEnergy*100.0) / 100.0)), nil
}
// UpdateFootprint calculates and updates the performance footprint for a job.
// This is called for running jobs during intermediate updates or when archiving.
//
// A footprint is a summary statistic (avg/min/max) for each monitored metric.
// The specific statistic type is defined in the cluster config's Footprint field.
// Results are stored as JSON with keys like "metric_avg", "metric_max", etc.
//
// Example: For a "cpu_load" metric with Footprint="avg", this stores
// the average CPU load across all nodes as "cpu_load_avg": 85.3
func (r *JobRepository) UpdateFootprint(
stmt sq.UpdateBuilder,
jobMeta *schema.Job,
) (sq.UpdateBuilder, error) {
/* Note: Only Called for Running Jobs during Intermediate Update or on Archiving */
sc, err := archive.GetSubCluster(jobMeta.Cluster, jobMeta.SubCluster)
if err != nil {
cclog.Errorf("cannot get subcluster: %s", err.Error())
@@ -783,7 +1002,10 @@ func (r *JobRepository) UpdateFootprint(
}
footprint := make(map[string]float64)
// Build footprint map with metric_stattype as keys
for _, fp := range sc.Footprint {
// Determine which statistic to use: avg, min, or max
// First check global metric config, then cluster-specific config
var statType string
for _, gm := range archive.GlobalMetricList {
if gm.Name == fp {
@@ -791,15 +1013,18 @@ func (r *JobRepository) UpdateFootprint(
}
}
// Validate statistic type
if statType != "avg" && statType != "min" && statType != "max" {
cclog.Warnf("unknown statType for footprint update: %s", statType)
return stmt, fmt.Errorf("unknown statType for footprint update: %s", statType)
}
// Override with cluster-specific config if available
if i, err := archive.MetricIndex(sc.MetricConfig, fp); err != nil {
statType = sc.MetricConfig[i].Footprint
}
// Store as "metric_stattype": value (e.g., "cpu_load_avg": 85.3)
name := fmt.Sprintf("%s_%s", fp, statType)
footprint[name] = LoadJobStat(jobMeta, fp, statType)
}