diff --git a/internal/importer/handleImport.go b/internal/importer/handleImport.go index fd1b2f4..4ef9f84 100644 --- a/internal/importer/handleImport.go +++ b/internal/importer/handleImport.go @@ -14,6 +14,7 @@ import ( "github.com/ClusterCockpit/cc-backend/internal/config" "github.com/ClusterCockpit/cc-backend/internal/repository" + "github.com/ClusterCockpit/cc-backend/internal/util" "github.com/ClusterCockpit/cc-backend/pkg/archive" "github.com/ClusterCockpit/cc-backend/pkg/log" "github.com/ClusterCockpit/cc-backend/pkg/schema" @@ -86,13 +87,22 @@ func HandleImportFlag(flag string) error { StartTimeUnix: jobMeta.StartTime, } - // TODO: Do loop for new sub structure for stats - // job.LoadAvg = loadJobStat(&jobMeta, "cpu_load") - // job.FlopsAnyAvg = loadJobStat(&jobMeta, "flops_any") - // job.MemUsedMax = loadJobStat(&jobMeta, "mem_used") - // job.MemBwAvg = loadJobStat(&jobMeta, "mem_bw") - // job.NetBwAvg = loadJobStat(&jobMeta, "net_bw") - // job.FileBwAvg = loadJobStat(&jobMeta, "file_bw") + sc, err := archive.GetSubCluster(jobMeta.Cluster, jobMeta.SubCluster) + if err != nil { + log.Errorf("cannot get subcluster: %s", err.Error()) + return err + } + + job.Footprint = make(map[string]float64) + + for _, fp := range sc.Footprint { + job.Footprint[fp] = util.LoadJobStat(&jobMeta, fp) + } + job.RawFootprint, err = json.Marshal(job.Footprint) + if err != nil { + log.Warn("Error while marshaling job footprint") + return err + } job.RawResources, err = json.Marshal(job.Resources) if err != nil { diff --git a/internal/importer/initDB.go b/internal/importer/initDB.go index 0055768..468ebb1 100644 --- a/internal/importer/initDB.go +++ b/internal/importer/initDB.go @@ -11,6 +11,7 @@ import ( "time" "github.com/ClusterCockpit/cc-backend/internal/repository" + "github.com/ClusterCockpit/cc-backend/internal/util" "github.com/ClusterCockpit/cc-backend/pkg/archive" "github.com/ClusterCockpit/cc-backend/pkg/log" "github.com/ClusterCockpit/cc-backend/pkg/schema" @@ -60,13 +61,22 @@ func InitDB() error { StartTimeUnix: jobMeta.StartTime, } - // TODO: Convert to loop for new footprint layout - // job.LoadAvg = loadJobStat(jobMeta, "cpu_load") - // job.FlopsAnyAvg = loadJobStat(jobMeta, "flops_any") - // job.MemUsedMax = loadJobStat(jobMeta, "mem_used") - // job.MemBwAvg = loadJobStat(jobMeta, "mem_bw") - // job.NetBwAvg = loadJobStat(jobMeta, "net_bw") - // job.FileBwAvg = loadJobStat(jobMeta, "file_bw") + sc, err := archive.GetSubCluster(jobMeta.Cluster, jobMeta.SubCluster) + if err != nil { + log.Errorf("cannot get subcluster: %s", err.Error()) + return err + } + job.Footprint = make(map[string]float64) + + for _, fp := range sc.Footprint { + job.Footprint[fp] = util.LoadJobStat(jobMeta, fp) + } + + job.RawFootprint, err = json.Marshal(job.Footprint) + if err != nil { + log.Warn("Error while marshaling job footprint") + return err + } job.RawResources, err = json.Marshal(job.Resources) if err != nil { @@ -150,18 +160,6 @@ func SanityChecks(job *schema.BaseJob) error { return nil } -func loadJobStat(job *schema.JobMeta, metric string) float64 { - if stats, ok := job.Statistics[metric]; ok { - if metric == "mem_used" { - return stats.Max - } else { - return stats.Avg - } - } - - return 0.0 -} - func checkJobData(d *schema.JobData) error { for _, scopes := range *d { // var newUnit schema.Unit diff --git a/internal/importer/testdata/cluster-fritz.json b/internal/importer/testdata/cluster-fritz.json index 23a8343..8519fd4 100644 --- a/internal/importer/testdata/cluster-fritz.json +++ b/internal/importer/testdata/cluster-fritz.json @@ -1,746 +1,750 @@ { - "name": "fritz", - "metricConfig": [ - { - "name": "cpu_load", - "unit": { - "base": "" - }, - "scope": "node", - "aggregation": "avg", - "timestep": 60, - "peak": 72, - "normal": 72, - "caution": 36, - "alert": 20 + "name": "fritz", + "metricConfig": [ + { + "name": "cpu_load", + "unit": { + "base": "" + }, + "scope": "node", + "aggregation": "avg", + "footprint": true, + "timestep": 60, + "peak": 72, + "normal": 72, + "caution": 36, + "alert": 20 + }, + { + "name": "cpu_user", + "unit": { + "base": "" + }, + "scope": "hwthread", + "aggregation": "avg", + "timestep": 60, + "peak": 100, + "normal": 50, + "caution": 20, + "alert": 10 + }, + { + "name": "mem_used", + "unit": { + "base": "B", + "prefix": "G" + }, + "scope": "node", + "aggregation": "sum", + "footprint": true, + "timestep": 60, + "peak": 256, + "normal": 128, + "caution": 200, + "alert": 240 + }, + { + "name": "flops_any", + "unit": { + "base": "F/s", + "prefix": "G" + }, + "scope": "hwthread", + "aggregation": "sum", + "footprint": true, + "timestep": 60, + "peak": 5600, + "normal": 1000, + "caution": 200, + "alert": 50 + }, + { + "name": "flops_sp", + "unit": { + "base": "F/s", + "prefix": "G" + }, + "scope": "hwthread", + "aggregation": "sum", + "timestep": 60, + "peak": 5600, + "normal": 1000, + "caution": 200, + "alert": 50 + }, + { + "name": "flops_dp", + "unit": { + "base": "F/s", + "prefix": "G" + }, + "scope": "hwthread", + "aggregation": "sum", + "timestep": 60, + "peak": 2300, + "normal": 500, + "caution": 100, + "alert": 50 + }, + { + "name": "mem_bw", + "unit": { + "base": "B/s", + "prefix": "G" + }, + "scope": "socket", + "aggregation": "sum", + "footprint": true, + "timestep": 60, + "peak": 350, + "normal": 100, + "caution": 50, + "alert": 10 + }, + { + "name": "clock", + "unit": { + "base": "Hz", + "prefix": "M" + }, + "scope": "hwthread", + "aggregation": "avg", + "timestep": 60, + "peak": 3000, + "normal": 2400, + "caution": 1800, + "alert": 1200 + }, + { + "name": "cpu_power", + "unit": { + "base": "W" + }, + "scope": "socket", + "aggregation": "sum", + "timestep": 60, + "peak": 500, + "normal": 250, + "caution": 100, + "alert": 50 + }, + { + "name": "mem_power", + "unit": { + "base": "W" + }, + "scope": "socket", + "aggregation": "sum", + "timestep": 60, + "peak": 100, + "normal": 50, + "caution": 20, + "alert": 10 + }, + { + "name": "ipc", + "unit": { + "base": "IPC" + }, + "scope": "hwthread", + "aggregation": "avg", + "timestep": 60, + "peak": 4, + "normal": 2, + "caution": 1, + "alert": 0.5 + }, + { + "name": "vectorization_ratio", + "unit": { + "base": "" + }, + "scope": "hwthread", + "aggregation": "avg", + "timestep": 60, + "peak": 100, + "normal": 60, + "caution": 40, + "alert": 10 + }, + { + "name": "ib_recv", + "unit": { + "base": "B/s" + }, + "scope": "node", + "aggregation": "sum", + "timestep": 60, + "peak": 1250000, + "normal": 6000000, + "caution": 200, + "alert": 1 + }, + { + "name": "ib_xmit", + "unit": { + "base": "B/s" + }, + "scope": "node", + "aggregation": "sum", + "timestep": 60, + "peak": 1250000, + "normal": 6000000, + "caution": 200, + "alert": 1 + }, + { + "name": "ib_recv_pkts", + "unit": { + "base": "" + }, + "scope": "node", + "aggregation": "sum", + "timestep": 60, + "peak": 6, + "normal": 4, + "caution": 2, + "alert": 1 + }, + { + "name": "ib_xmit_pkts", + "unit": { + "base": "" + }, + "scope": "node", + "aggregation": "sum", + "timestep": 60, + "peak": 6, + "normal": 4, + "caution": 2, + "alert": 1 + }, + { + "name": "nfs4_read", + "unit": { + "base": "B/s", + "prefix": "M" + }, + "scope": "node", + "aggregation": "sum", + "timestep": 60, + "peak": 6, + "normal": 4, + "caution": 2, + "alert": 1 + }, + { + "name": "nfs4_write", + "unit": { + "base": "B/s", + "prefix": "M" + }, + "scope": "node", + "aggregation": "sum", + "timestep": 60, + "peak": 6, + "normal": 4, + "caution": 2, + "alert": 1 + }, + { + "name": "nfs4_total", + "unit": { + "base": "B/s", + "prefix": "M" + }, + "scope": "node", + "aggregation": "sum", + "timestep": 60, + "peak": 6, + "normal": 4, + "caution": 2, + "alert": 1 + } + ], + "subClusters": [ + { + "name": "main", + "nodes": "f01[01-88],f02[01-88],f03[01-88],f03[01-88],f04[01-88],f05[01-88],f06[01-88],f07[01-88],f08[01-88],f09[01-88],f10[01-88],f11[01-56],f12[01-56]", + "processorType": "Intel Icelake", + "socketsPerNode": 2, + "coresPerSocket": 36, + "threadsPerCore": 1, + "flopRateScalar": { + "unit": { + "base": "F/s", + "prefix": "G" }, - { - "name": "cpu_user", - "unit": { - "base": "" - }, - "scope": "hwthread", - "aggregation": "avg", - "timestep": 60, - "peak": 100, - "normal": 50, - "caution": 20, - "alert": 10 + "value": 432 + }, + "flopRateSimd": { + "unit": { + "base": "F/s", + "prefix": "G" }, - { - "name": "mem_used", - "unit": { - "base": "B", - "prefix": "G" - }, - "scope": "node", - "aggregation": "sum", - "timestep": 60, - "peak": 256, - "normal": 128, - "caution": 200, - "alert": 240 + "value": 9216 + }, + "memoryBandwidth": { + "unit": { + "base": "B/s", + "prefix": "G" }, - { - "name": "flops_any", - "unit": { - "base": "F/s", - "prefix": "G" - }, - "scope": "hwthread", - "aggregation": "sum", - "timestep": 60, - "peak": 5600, - "normal": 1000, - "caution": 200, - "alert": 50 - }, - { - "name": "flops_sp", - "unit": { - "base": "F/s", - "prefix": "G" - }, - "scope": "hwthread", - "aggregation": "sum", - "timestep": 60, - "peak": 5600, - "normal": 1000, - "caution": 200, - "alert": 50 - }, - { - "name": "flops_dp", - "unit": { - "base": "F/s", - "prefix": "G" - }, - "scope": "hwthread", - "aggregation": "sum", - "timestep": 60, - "peak": 2300, - "normal": 500, - "caution": 100, - "alert": 50 - }, - { - "name": "mem_bw", - "unit": { - "base": "B/s", - "prefix": "G" - }, - "scope": "socket", - "aggregation": "sum", - "timestep": 60, - "peak": 350, - "normal": 100, - "caution": 50, - "alert": 10 - }, - { - "name": "clock", - "unit": { - "base": "Hz", - "prefix": "M" - }, - "scope": "hwthread", - "aggregation": "avg", - "timestep": 60, - "peak": 3000, - "normal": 2400, - "caution": 1800, - "alert": 1200 - }, - { - "name": "cpu_power", - "unit": { - "base": "W" - }, - "scope": "socket", - "aggregation": "sum", - "timestep": 60, - "peak": 500, - "normal": 250, - "caution": 100, - "alert": 50 - }, - { - "name": "mem_power", - "unit": { - "base": "W" - }, - "scope": "socket", - "aggregation": "sum", - "timestep": 60, - "peak": 100, - "normal": 50, - "caution": 20, - "alert": 10 - }, - { - "name": "ipc", - "unit": { - "base": "IPC" - }, - "scope": "hwthread", - "aggregation": "avg", - "timestep": 60, - "peak": 4, - "normal": 2, - "caution": 1, - "alert": 0.5 - }, - { - "name": "vectorization_ratio", - "unit": { - "base": "" - }, - "scope": "hwthread", - "aggregation": "avg", - "timestep": 60, - "peak": 100, - "normal": 60, - "caution": 40, - "alert": 10 - }, - { - "name": "ib_recv", - "unit": { - "base": "B/s" - }, - "scope": "node", - "aggregation": "sum", - "timestep": 60, - "peak": 1250000, - "normal": 6000000, - "caution": 200, - "alert": 1 - }, - { - "name": "ib_xmit", - "unit": { - "base": "B/s" - }, - "scope": "node", - "aggregation": "sum", - "timestep": 60, - "peak": 1250000, - "normal": 6000000, - "caution": 200, - "alert": 1 - }, - { - "name": "ib_recv_pkts", - "unit": { - "base": "" - }, - "scope": "node", - "aggregation": "sum", - "timestep": 60, - "peak": 6, - "normal": 4, - "caution": 2, - "alert": 1 - }, - { - "name": "ib_xmit_pkts", - "unit": { - "base": "" - }, - "scope": "node", - "aggregation": "sum", - "timestep": 60, - "peak": 6, - "normal": 4, - "caution": 2, - "alert": 1 - }, - { - "name": "nfs4_read", - "unit": { - "base": "B/s", - "prefix": "M" - }, - "scope": "node", - "aggregation": "sum", - "timestep": 60, - "peak": 6, - "normal": 4, - "caution": 2, - "alert": 1 - }, - { - "name": "nfs4_write", - "unit": { - "base": "B/s", - "prefix": "M" - }, - "scope": "node", - "aggregation": "sum", - "timestep": 60, - "peak": 6, - "normal": 4, - "caution": 2, - "alert": 1 - }, - { - "name": "nfs4_total", - "unit": { - "base": "B/s", - "prefix": "M" - }, - "scope": "node", - "aggregation": "sum", - "timestep": 60, - "peak": 6, - "normal": 4, - "caution": 2, - "alert": 1 - } - ], - "subClusters": [ - { - "name": "main", - "nodes": "f01[01-88],f02[01-88],f03[01-88],f03[01-88],f04[01-88],f05[01-88],f06[01-88],f07[01-88],f08[01-88],f09[01-88],f10[01-88],f11[01-56],f12[01-56]", - "processorType": "Intel Icelake", - "socketsPerNode": 2, - "coresPerSocket": 36, - "threadsPerCore": 1, - "flopRateScalar": { - "unit": { - "base": "F/s", - "prefix": "G" - }, - "value": 432 - }, - "flopRateSimd": { - "unit": { - "base": "F/s", - "prefix": "G" - }, - "value": 9216 - }, - "memoryBandwidth": { - "unit": { - "base": "B/s", - "prefix": "G" - }, - "value": 350 - }, - "topology": { - "node": [ - 0, - 1, - 2, - 3, - 4, - 5, - 6, - 7, - 8, - 9, - 10, - 11, - 12, - 13, - 14, - 15, - 16, - 17, - 18, - 19, - 20, - 21, - 22, - 23, - 24, - 25, - 26, - 27, - 28, - 29, - 30, - 31, - 32, - 33, - 34, - 35, - 36, - 37, - 38, - 39, - 40, - 41, - 42, - 43, - 44, - 45, - 46, - 47, - 48, - 49, - 50, - 51, - 52, - 53, - 54, - 55, - 56, - 57, - 58, - 59, - 60, - 61, - 62, - 63, - 64, - 65, - 66, - 67, - 68, - 69, - 70, - 71 - ], - "socket": [ - [ - 0, - 1, - 2, - 3, - 4, - 5, - 6, - 7, - 8, - 9, - 10, - 11, - 12, - 13, - 14, - 15, - 16, - 17, - 18, - 19, - 20, - 21, - 22, - 23, - 24, - 25, - 26, - 27, - 28, - 29, - 30, - 31, - 32, - 33, - 34, - 35 - ], - [ - 36, - 37, - 38, - 39, - 40, - 41, - 42, - 43, - 44, - 45, - 46, - 47, - 48, - 49, - 50, - 51, - 52, - 53, - 54, - 55, - 56, - 57, - 58, - 59, - 60, - 61, - 62, - 63, - 64, - 65, - 66, - 67, - 68, - 69, - 70, - 71 - ] - ], - "memoryDomain": [ - [ - 0, - 1, - 2, - 3, - 4, - 5, - 6, - 7, - 8, - 9, - 10, - 11, - 12, - 13, - 14, - 15, - 16, - 17 - ], - [ - 18, - 19, - 20, - 21, - 22, - 23, - 24, - 25, - 26, - 27, - 28, - 29, - 30, - 31, - 32, - 33, - 34, - 35 - ], - [ - 36, - 37, - 38, - 39, - 40, - 41, - 42, - 43, - 44, - 45, - 46, - 47, - 48, - 49, - 50, - 51, - 52, - 53 - ], - [ - 54, - 55, - 56, - 57, - 58, - 59, - 60, - 61, - 62, - 63, - 64, - 65, - 66, - 67, - 68, - 69, - 70, - 71 - ] - ], - "core": [ - [ - 0 - ], - [ - 1 - ], - [ - 2 - ], - [ - 3 - ], - [ - 4 - ], - [ - 5 - ], - [ - 6 - ], - [ - 7 - ], - [ - 8 - ], - [ - 9 - ], - [ - 10 - ], - [ - 11 - ], - [ - 12 - ], - [ - 13 - ], - [ - 14 - ], - [ - 15 - ], - [ - 16 - ], - [ - 17 - ], - [ - 18 - ], - [ - 19 - ], - [ - 20 - ], - [ - 21 - ], - [ - 22 - ], - [ - 23 - ], - [ - 24 - ], - [ - 25 - ], - [ - 26 - ], - [ - 27 - ], - [ - 28 - ], - [ - 29 - ], - [ - 30 - ], - [ - 31 - ], - [ - 32 - ], - [ - 33 - ], - [ - 34 - ], - [ - 35 - ], - [ - 36 - ], - [ - 37 - ], - [ - 38 - ], - [ - 39 - ], - [ - 40 - ], - [ - 41 - ], - [ - 42 - ], - [ - 43 - ], - [ - 44 - ], - [ - 45 - ], - [ - 46 - ], - [ - 47 - ], - [ - 48 - ], - [ - 49 - ], - [ - 50 - ], - [ - 51 - ], - [ - 52 - ], - [ - 53 - ], - [ - 54 - ], - [ - 55 - ], - [ - 56 - ], - [ - 57 - ], - [ - 58 - ], - [ - 59 - ], - [ - 60 - ], - [ - 61 - ], - [ - 62 - ], - [ - 63 - ], - [ - 64 - ], - [ - 65 - ], - [ - 66 - ], - [ - 67 - ], - [ - 68 - ], - [ - 69 - ], - [ - 70 - ], - [ - 71 - ] - ] - } - } - ] + "value": 350 + }, + "topology": { + "node": [ + 0, + 1, + 2, + 3, + 4, + 5, + 6, + 7, + 8, + 9, + 10, + 11, + 12, + 13, + 14, + 15, + 16, + 17, + 18, + 19, + 20, + 21, + 22, + 23, + 24, + 25, + 26, + 27, + 28, + 29, + 30, + 31, + 32, + 33, + 34, + 35, + 36, + 37, + 38, + 39, + 40, + 41, + 42, + 43, + 44, + 45, + 46, + 47, + 48, + 49, + 50, + 51, + 52, + 53, + 54, + 55, + 56, + 57, + 58, + 59, + 60, + 61, + 62, + 63, + 64, + 65, + 66, + 67, + 68, + 69, + 70, + 71 + ], + "socket": [ + [ + 0, + 1, + 2, + 3, + 4, + 5, + 6, + 7, + 8, + 9, + 10, + 11, + 12, + 13, + 14, + 15, + 16, + 17, + 18, + 19, + 20, + 21, + 22, + 23, + 24, + 25, + 26, + 27, + 28, + 29, + 30, + 31, + 32, + 33, + 34, + 35 + ], + [ + 36, + 37, + 38, + 39, + 40, + 41, + 42, + 43, + 44, + 45, + 46, + 47, + 48, + 49, + 50, + 51, + 52, + 53, + 54, + 55, + 56, + 57, + 58, + 59, + 60, + 61, + 62, + 63, + 64, + 65, + 66, + 67, + 68, + 69, + 70, + 71 + ] + ], + "memoryDomain": [ + [ + 0, + 1, + 2, + 3, + 4, + 5, + 6, + 7, + 8, + 9, + 10, + 11, + 12, + 13, + 14, + 15, + 16, + 17 + ], + [ + 18, + 19, + 20, + 21, + 22, + 23, + 24, + 25, + 26, + 27, + 28, + 29, + 30, + 31, + 32, + 33, + 34, + 35 + ], + [ + 36, + 37, + 38, + 39, + 40, + 41, + 42, + 43, + 44, + 45, + 46, + 47, + 48, + 49, + 50, + 51, + 52, + 53 + ], + [ + 54, + 55, + 56, + 57, + 58, + 59, + 60, + 61, + 62, + 63, + 64, + 65, + 66, + 67, + 68, + 69, + 70, + 71 + ] + ], + "core": [ + [ + 0 + ], + [ + 1 + ], + [ + 2 + ], + [ + 3 + ], + [ + 4 + ], + [ + 5 + ], + [ + 6 + ], + [ + 7 + ], + [ + 8 + ], + [ + 9 + ], + [ + 10 + ], + [ + 11 + ], + [ + 12 + ], + [ + 13 + ], + [ + 14 + ], + [ + 15 + ], + [ + 16 + ], + [ + 17 + ], + [ + 18 + ], + [ + 19 + ], + [ + 20 + ], + [ + 21 + ], + [ + 22 + ], + [ + 23 + ], + [ + 24 + ], + [ + 25 + ], + [ + 26 + ], + [ + 27 + ], + [ + 28 + ], + [ + 29 + ], + [ + 30 + ], + [ + 31 + ], + [ + 32 + ], + [ + 33 + ], + [ + 34 + ], + [ + 35 + ], + [ + 36 + ], + [ + 37 + ], + [ + 38 + ], + [ + 39 + ], + [ + 40 + ], + [ + 41 + ], + [ + 42 + ], + [ + 43 + ], + [ + 44 + ], + [ + 45 + ], + [ + 46 + ], + [ + 47 + ], + [ + 48 + ], + [ + 49 + ], + [ + 50 + ], + [ + 51 + ], + [ + 52 + ], + [ + 53 + ], + [ + 54 + ], + [ + 55 + ], + [ + 56 + ], + [ + 57 + ], + [ + 58 + ], + [ + 59 + ], + [ + 60 + ], + [ + 61 + ], + [ + 62 + ], + [ + 63 + ], + [ + 64 + ], + [ + 65 + ], + [ + 66 + ], + [ + 67 + ], + [ + 68 + ], + [ + 69 + ], + [ + 70 + ], + [ + 71 + ] + ] + } + } + ] } diff --git a/internal/repository/job.go b/internal/repository/job.go index da71566..9438edd 100644 --- a/internal/repository/job.go +++ b/internal/repository/job.go @@ -16,6 +16,7 @@ import ( "github.com/ClusterCockpit/cc-backend/internal/graph/model" "github.com/ClusterCockpit/cc-backend/internal/metricdata" + "github.com/ClusterCockpit/cc-backend/internal/util" "github.com/ClusterCockpit/cc-backend/pkg/archive" "github.com/ClusterCockpit/cc-backend/pkg/log" "github.com/ClusterCockpit/cc-backend/pkg/lrucache" @@ -64,6 +65,7 @@ var jobColumns []string = []string{ func scanJob(row interface{ Scan(...interface{}) error }) (*schema.Job, error) { job := &schema.Job{} + if err := row.Scan( &job.ID, &job.JobID, &job.User, &job.Project, &job.Cluster, &job.SubCluster, &job.StartTimeUnix, &job.Partition, &job.ArrayJobId, &job.NumNodes, &job.NumHWThreads, &job.NumAcc, &job.Exclusive, &job.MonitoringStatus, &job.SMT, &job.State, @@ -79,7 +81,7 @@ func scanJob(row interface{ Scan(...interface{}) error }) (*schema.Job, error) { job.RawResources = nil if err := json.Unmarshal(job.RawFootprint, &job.Footprint); err != nil { - log.Warn("Error while unmarshaling raw footprint json") + log.Warnf("Error while unmarshaling raw footprint json: %v", err) return nil, err } job.RawFootprint = nil @@ -242,6 +244,7 @@ func (r *JobRepository) Find( } log.Debugf("Timer Find %s", time.Since(start)) + return scanJob(q.RunWith(r.stmtCache).QueryRow()) } @@ -397,6 +400,11 @@ func (r *JobRepository) FindConcurrentJobs( // Start inserts a new job in the table, returning the unique job ID. // Statistics are not transfered! func (r *JobRepository) Start(job *schema.JobMeta) (id int64, err error) { + job.RawFootprint, err = json.Marshal(job.Footprint) + if err != nil { + return -1, fmt.Errorf("REPOSITORY/JOB > encoding footprint field failed: %w", err) + } + job.RawResources, err = json.Marshal(job.Resources) if err != nil { return -1, fmt.Errorf("REPOSITORY/JOB > encoding resources field failed: %w", err) @@ -409,10 +417,10 @@ func (r *JobRepository) Start(job *schema.JobMeta) (id int64, err error) { res, err := r.DB.NamedExec(`INSERT INTO job ( job_id, user, project, cluster, subcluster, `+"`partition`"+`, array_job_id, num_nodes, num_hwthreads, num_acc, - exclusive, monitoring_status, smt, job_state, start_time, duration, walltime, resources, meta_data + exclusive, monitoring_status, smt, job_state, start_time, duration, walltime, footprint, resources, meta_data ) VALUES ( :job_id, :user, :project, :cluster, :subcluster, :partition, :array_job_id, :num_nodes, :num_hwthreads, :num_acc, - :exclusive, :monitoring_status, :smt, :job_state, :start_time, :duration, :walltime, :resources, :meta_data + :exclusive, :monitoring_status, :smt, :job_state, :start_time, :duration, :walltime, :footprint, :resources, :meta_data );`, job) if err != nil { return -1, err @@ -478,34 +486,32 @@ func (r *JobRepository) UpdateMonitoringStatus(job int64, monitoringStatus int32 // Stop updates the job with the database id jobId using the provided arguments. func (r *JobRepository) MarkArchived( - jobId int64, + jobMeta *schema.JobMeta, monitoringStatus int32, - metricStats map[string]schema.JobStatistics, ) error { stmt := sq.Update("job"). Set("monitoring_status", monitoringStatus). - Where("job.id = ?", jobId) + Where("job.id = ?", jobMeta.JobID) - for metric, stats := range metricStats { - switch metric { - case "flops_any": - stmt = stmt.Set("flops_any_avg", stats.Avg) - case "mem_used": - stmt = stmt.Set("mem_used_max", stats.Max) - case "mem_bw": - stmt = stmt.Set("mem_bw_avg", stats.Avg) - case "load": - stmt = stmt.Set("load_avg", stats.Avg) - case "cpu_load": - stmt = stmt.Set("load_avg", stats.Avg) - case "net_bw": - stmt = stmt.Set("net_bw_avg", stats.Avg) - case "file_bw": - stmt = stmt.Set("file_bw_avg", stats.Avg) - default: - log.Debugf("MarkArchived() Metric '%v' unknown", metric) - } + sc, err := archive.GetSubCluster(jobMeta.Cluster, jobMeta.SubCluster) + if err != nil { + log.Errorf("cannot get subcluster: %s", err.Error()) + return err } + footprint := make(map[string]float64) + + for _, fp := range sc.Footprint { + footprint[fp] = util.LoadJobStat(jobMeta, fp) + } + + var rawFootprint []byte + + if rawFootprint, err = json.Marshal(footprint); err != nil { + log.Warnf("Error while marshaling footprint for job, DB ID '%v'", jobMeta.ID) + return err + } + + stmt = stmt.Set("footprint", rawFootprint) if _, err := stmt.RunWith(r.stmtCache).Exec(); err != nil { log.Warn("Error while marking job as archived") @@ -541,7 +547,7 @@ func (r *JobRepository) archivingWorker() { } // Update the jobs database entry one last time: - if err := r.MarkArchived(job.ID, schema.MonitoringStatusArchivingSuccessful, jobMeta.Statistics); err != nil { + if err := r.MarkArchived(jobMeta, schema.MonitoringStatusArchivingSuccessful); err != nil { log.Errorf("archiving job (dbid: %d) failed at marking archived step: %s", job.ID, err.Error()) continue } @@ -786,12 +792,10 @@ func (r *JobRepository) FindJobsBetween(startTimeBegin int64, startTimeEnd int64 const NamedJobInsert string = `INSERT INTO job ( job_id, user, project, cluster, subcluster, ` + "`partition`" + `, array_job_id, num_nodes, num_hwthreads, num_acc, - exclusive, monitoring_status, smt, job_state, start_time, duration, walltime, resources, meta_data, - mem_used_max, flops_any_avg, mem_bw_avg, load_avg, net_bw_avg, net_data_vol_total, file_bw_avg, file_data_vol_total + exclusive, monitoring_status, smt, job_state, start_time, duration, walltime, footprint, resources, meta_data ) VALUES ( :job_id, :user, :project, :cluster, :subcluster, :partition, :array_job_id, :num_nodes, :num_hwthreads, :num_acc, - :exclusive, :monitoring_status, :smt, :job_state, :start_time, :duration, :walltime, :resources, :meta_data, - :mem_used_max, :flops_any_avg, :mem_bw_avg, :load_avg, :net_bw_avg, :net_data_vol_total, :file_bw_avg, :file_data_vol_total + :exclusive, :monitoring_status, :smt, :job_state, :start_time, :duration, :walltime, :footprint, :resources, :meta_data );` func (r *JobRepository) InsertJob(job *schema.Job) (int64, error) { diff --git a/internal/util/statistics.go b/internal/util/statistics.go index 384de58..9d971dc 100644 --- a/internal/util/statistics.go +++ b/internal/util/statistics.go @@ -4,7 +4,10 @@ // license that can be found in the LICENSE file. package util -import "golang.org/x/exp/constraints" +import ( + "github.com/ClusterCockpit/cc-backend/pkg/schema" + "golang.org/x/exp/constraints" +) func Min[T constraints.Ordered](a, b T) T { if a < b { @@ -19,3 +22,15 @@ func Max[T constraints.Ordered](a, b T) T { } return b } + +func LoadJobStat(job *schema.JobMeta, metric string) float64 { + if stats, ok := job.Statistics[metric]; ok { + if metric == "mem_used" { + return stats.Max + } else { + return stats.Avg + } + } + + return 0.0 +} diff --git a/pkg/archive/fsBackend_test.go b/pkg/archive/fsBackend_test.go index 78d634a..d60e478 100644 --- a/pkg/archive/fsBackend_test.go +++ b/pkg/archive/fsBackend_test.go @@ -51,7 +51,7 @@ func TestInit(t *testing.T) { if version != 1 { t.Fail() } - if len(fsa.clusters) != 3 || fsa.clusters[0] != "emmy" { + if len(fsa.clusters) != 3 || fsa.clusters[1] != "emmy" { t.Fail() } } diff --git a/pkg/schema/job.go b/pkg/schema/job.go index f5363b3..3cfdf55 100644 --- a/pkg/schema/job.go +++ b/pkg/schema/job.go @@ -16,8 +16,6 @@ import ( // Common subset of Job and JobMeta. Use one of those, not this type directly. type BaseJob struct { - Footprint map[string]float64 `json:"footPrint"` - MetaData map[string]string `json:"metaData"` Cluster string `json:"cluster" db:"cluster" example:"fritz"` SubCluster string `json:"subCluster" db:"subcluster" example:"main"` Partition string `json:"partition,omitempty" db:"partition" example:"main"` @@ -27,8 +25,10 @@ type BaseJob struct { Tags []*Tag `json:"tags,omitempty"` RawFootprint []byte `json:"-" db:"footprint"` RawMetaData []byte `json:"-" db:"meta_data"` - Resources []*Resource `json:"resources"` RawResources []byte `json:"-" db:"resources"` + Resources []*Resource `json:"resources"` + Footprint map[string]float64 `json:"footPrint"` + MetaData map[string]string `json:"metaData"` ConcurrentJobs JobLinkResultList `json:"concurrentJobs"` Energy float64 `json:"energy"` ArrayJobId int64 `json:"arrayJobId,omitempty" db:"array_job_id" example:"123000"`