mirror of
https://github.com/ClusterCockpit/cc-backend
synced 2024-12-27 05:49:04 +01:00
continue working on non-node scoped metrics
This commit is contained in:
parent
1c6ab3d062
commit
b7432fca5f
10
init-db.go
10
init-db.go
@ -88,7 +88,15 @@ func initDB(db *sqlx.DB, archive string) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
stmt, err := tx.PrepareNamed(schema.JobInsertStmt)
|
stmt, err := tx.PrepareNamed(`INSERT INTO job (
|
||||||
|
job_id, user, project, cluster, partition, array_job_id, num_nodes, num_hwthreads, num_acc,
|
||||||
|
exclusive, monitoring_status, smt, job_state, start_time, duration, resources, meta_data,
|
||||||
|
mem_used_max, flops_any_avg, mem_bw_avg, load_avg, net_bw_avg, net_data_vol_total, file_bw_avg, file_data_vol_total
|
||||||
|
) VALUES (
|
||||||
|
:job_id, :user, :project, :cluster, :partition, :array_job_id, :num_nodes, :num_hwthreads, :num_acc,
|
||||||
|
:exclusive, :monitoring_status, :smt, :job_state, :start_time, :duration, :resources, :meta_data,
|
||||||
|
:mem_used_max, :flops_any_avg, :mem_bw_avg, :load_avg, :net_bw_avg, :net_data_vol_total, :file_bw_avg, :file_data_vol_total
|
||||||
|
);`)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -243,7 +243,7 @@ func calcStatisticsSeries(job *schema.Job, jobData schema.JobData) error {
|
|||||||
for i := 0; i < n; i++ {
|
for i := 0; i < n; i++ {
|
||||||
sum, smin, smax := schema.Float(0.), math.MaxFloat32, -math.MaxFloat32
|
sum, smin, smax := schema.Float(0.), math.MaxFloat32, -math.MaxFloat32
|
||||||
for _, series := range jobMetric.Series {
|
for _, series := range jobMetric.Series {
|
||||||
if len(series.Data) >= i {
|
if i >= len(series.Data) {
|
||||||
sum, smin, smax = schema.NaN, math.NaN(), math.NaN()
|
sum, smin, smax = schema.NaN, math.NaN(), math.NaN()
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
@ -258,9 +258,9 @@ func calcStatisticsSeries(job *schema.Job, jobData schema.JobData) error {
|
|||||||
max[i] = schema.Float(smax)
|
max[i] = schema.Float(smax)
|
||||||
}
|
}
|
||||||
|
|
||||||
jobMetric.StatisticsSeries.Mean = mean
|
jobMetric.StatisticsSeries = &schema.StatsSeries{
|
||||||
jobMetric.StatisticsSeries.Min = min
|
Min: min, Mean: mean, Max: max,
|
||||||
jobMetric.StatisticsSeries.Max = max
|
}
|
||||||
jobMetric.Series = nil
|
jobMetric.Series = nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -7,6 +7,7 @@ import (
|
|||||||
"encoding/json"
|
"encoding/json"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"log"
|
||||||
"net/http"
|
"net/http"
|
||||||
"strconv"
|
"strconv"
|
||||||
"time"
|
"time"
|
||||||
@ -81,6 +82,7 @@ func (ccms *CCMetricStore) doRequest(job *schema.Job, suffix string, metrics []s
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (ccms *CCMetricStore) LoadData(job *schema.Job, metrics []string, scopes []schema.MetricScope, ctx context.Context) (schema.JobData, error) {
|
func (ccms *CCMetricStore) LoadData(job *schema.Job, metrics []string, scopes []schema.MetricScope, ctx context.Context) (schema.JobData, error) {
|
||||||
|
// log.Printf("job: %#v", job)
|
||||||
|
|
||||||
type ApiQuery struct {
|
type ApiQuery struct {
|
||||||
Metric string `json:"metric"`
|
Metric string `json:"metric"`
|
||||||
@ -106,7 +108,7 @@ func (ccms *CCMetricStore) LoadData(job *schema.Job, metrics []string, scopes []
|
|||||||
reqBody := ApiQueryRequest{
|
reqBody := ApiQueryRequest{
|
||||||
Cluster: job.Cluster,
|
Cluster: job.Cluster,
|
||||||
From: job.StartTime.Unix(),
|
From: job.StartTime.Unix(),
|
||||||
To: job.StartTime.Add(time.Duration(job.Duration)).Unix(),
|
To: job.StartTime.Add(time.Duration(job.Duration) * time.Second).Unix(),
|
||||||
Queries: make([]ApiQuery, 0),
|
Queries: make([]ApiQuery, 0),
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -118,12 +120,20 @@ func (ccms *CCMetricStore) LoadData(job *schema.Job, metrics []string, scopes []
|
|||||||
scopeForMetric := map[string]schema.MetricScope{}
|
scopeForMetric := map[string]schema.MetricScope{}
|
||||||
for _, metric := range metrics {
|
for _, metric := range metrics {
|
||||||
mc := config.GetMetricConfig(job.Cluster, metric)
|
mc := config.GetMetricConfig(job.Cluster, metric)
|
||||||
|
if mc == nil {
|
||||||
|
// return nil, fmt.Errorf("metric '%s' is not specified for cluster '%s'", metric, job.Cluster)
|
||||||
|
log.Printf("metric '%s' is not specified for cluster '%s'", metric, job.Cluster)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
nativeScope, requestedScope := mc.Scope, scopes[0]
|
nativeScope, requestedScope := mc.Scope, scopes[0]
|
||||||
|
|
||||||
// case 1: A metric is requested at node scope with a native scope of node as well
|
// case 1: A metric is requested at node scope with a native scope of node as well
|
||||||
// case 2: A metric is requested at node scope and node is exclusive
|
// case 2: A metric is requested at node scope and node is exclusive
|
||||||
|
// case 3: A metric has native scope node
|
||||||
if (nativeScope == requestedScope && nativeScope == schema.MetricScopeNode) ||
|
if (nativeScope == requestedScope && nativeScope == schema.MetricScopeNode) ||
|
||||||
(job.Exclusive == 1 && requestedScope == schema.MetricScopeNode) {
|
(job.Exclusive == 1 && requestedScope == schema.MetricScopeNode) ||
|
||||||
|
(nativeScope == schema.MetricScopeNode) {
|
||||||
nodes := map[string]bool{}
|
nodes := map[string]bool{}
|
||||||
for _, resource := range job.Resources {
|
for _, resource := range job.Resources {
|
||||||
nodes[resource.Hostname] = true
|
nodes[resource.Hostname] = true
|
||||||
@ -188,6 +198,8 @@ func (ccms *CCMetricStore) LoadData(job *schema.Job, metrics []string, scopes []
|
|||||||
panic("todo")
|
panic("todo")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// log.Printf("query: %#v", reqBody)
|
||||||
|
|
||||||
buf := &bytes.Buffer{}
|
buf := &bytes.Buffer{}
|
||||||
if err := json.NewEncoder(buf).Encode(reqBody); err != nil {
|
if err := json.NewEncoder(buf).Encode(reqBody); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@ -213,9 +225,16 @@ func (ccms *CCMetricStore) LoadData(job *schema.Job, metrics []string, scopes []
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// log.Printf("response: %#v", resBody)
|
||||||
|
|
||||||
var jobData schema.JobData = make(schema.JobData)
|
var jobData schema.JobData = make(schema.JobData)
|
||||||
for _, res := range resBody {
|
for _, res := range resBody {
|
||||||
|
|
||||||
metric := res.Query.Metric
|
metric := res.Query.Metric
|
||||||
|
if _, ok := jobData[metric]; !ok {
|
||||||
|
jobData[metric] = make(map[schema.MetricScope]*schema.JobMetric)
|
||||||
|
}
|
||||||
|
|
||||||
if res.Error != nil {
|
if res.Error != nil {
|
||||||
return nil, fmt.Errorf("cc-metric-store error while fetching %s: %s", metric, *res.Error)
|
return nil, fmt.Errorf("cc-metric-store error while fetching %s: %s", metric, *res.Error)
|
||||||
}
|
}
|
||||||
@ -239,6 +258,14 @@ func (ccms *CCMetricStore) LoadData(job *schema.Job, metrics []string, scopes []
|
|||||||
*id, _ = strconv.Atoi(res.Query.TypeIds[0])
|
*id, _ = strconv.Atoi(res.Query.TypeIds[0])
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if res.Avg.IsNaN() || res.Min.IsNaN() || res.Max.IsNaN() {
|
||||||
|
// TODO: use schema.Float instead of float64?
|
||||||
|
// This is done because regular float64 can not be JSONed when NaN.
|
||||||
|
res.Avg = schema.Float(0)
|
||||||
|
res.Min = schema.Float(0)
|
||||||
|
res.Max = schema.Float(0)
|
||||||
|
}
|
||||||
|
|
||||||
jobMetric.Series = append(jobMetric.Series, schema.Series{
|
jobMetric.Series = append(jobMetric.Series, schema.Series{
|
||||||
Hostname: res.Query.Hostname,
|
Hostname: res.Query.Hostname,
|
||||||
Id: id,
|
Id: id,
|
||||||
|
@ -63,7 +63,13 @@ func LoadData(job *schema.Job, metrics []string, scopes []schema.MetricScope, ct
|
|||||||
return nil, fmt.Errorf("no metric data repository configured for '%s'", job.Cluster)
|
return nil, fmt.Errorf("no metric data repository configured for '%s'", job.Cluster)
|
||||||
}
|
}
|
||||||
|
|
||||||
return repo.LoadData(job, metrics, scopes, ctx)
|
data, err := repo.LoadData(job, metrics, scopes, ctx)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
calcStatisticsSeries(job, data)
|
||||||
|
return data, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
data, err := loadFromArchive(job)
|
data, err := loadFromArchive(job)
|
||||||
|
@ -11,7 +11,6 @@ import (
|
|||||||
// Common subset of Job and JobMeta. Use one of those, not
|
// Common subset of Job and JobMeta. Use one of those, not
|
||||||
// this type directly.
|
// this type directly.
|
||||||
type BaseJob struct {
|
type BaseJob struct {
|
||||||
ID int64 `json:"id" db:"id"`
|
|
||||||
JobID int64 `json:"jobId" db:"job_id"`
|
JobID int64 `json:"jobId" db:"job_id"`
|
||||||
User string `json:"user" db:"user"`
|
User string `json:"user" db:"user"`
|
||||||
Project string `json:"project" db:"project"`
|
Project string `json:"project" db:"project"`
|
||||||
@ -27,14 +26,15 @@ type BaseJob struct {
|
|||||||
State JobState `json:"jobState" db:"job_state"`
|
State JobState `json:"jobState" db:"job_state"`
|
||||||
Duration int32 `json:"duration" db:"duration"`
|
Duration int32 `json:"duration" db:"duration"`
|
||||||
Tags []*Tag `json:"tags"`
|
Tags []*Tag `json:"tags"`
|
||||||
|
RawResources []byte `json:"-" db:"resources"`
|
||||||
Resources []*Resource `json:"resources"`
|
Resources []*Resource `json:"resources"`
|
||||||
MetaData interface{} `json:"metaData" db:"meta_data"`
|
MetaData interface{} `json:"metaData" db:"meta_data"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// This type is used as the GraphQL interface and using sqlx as a table row.
|
// This type is used as the GraphQL interface and using sqlx as a table row.
|
||||||
type Job struct {
|
type Job struct {
|
||||||
|
ID int64 `json:"id" db:"id"`
|
||||||
BaseJob
|
BaseJob
|
||||||
RawResources []byte `json:"-" db:"resources"`
|
|
||||||
StartTime time.Time `json:"startTime" db:"start_time"`
|
StartTime time.Time `json:"startTime" db:"start_time"`
|
||||||
MemUsedMax float64 `json:"-" db:"mem_used_max"`
|
MemUsedMax float64 `json:"-" db:"mem_used_max"`
|
||||||
FlopsAnyAvg float64 `json:"-" db:"flops_any_avg"`
|
FlopsAnyAvg float64 `json:"-" db:"flops_any_avg"`
|
||||||
@ -52,7 +52,7 @@ type Job struct {
|
|||||||
// the StartTime field with one of type int64.
|
// the StartTime field with one of type int64.
|
||||||
type JobMeta struct {
|
type JobMeta struct {
|
||||||
BaseJob
|
BaseJob
|
||||||
StartTime int64 `json:"startTime"`
|
StartTime int64 `json:"startTime" db:"start_time"`
|
||||||
Statistics map[string]JobStatistics `json:"statistics,omitempty"`
|
Statistics map[string]JobStatistics `json:"statistics,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -68,16 +68,6 @@ var JobColumns []string = []string{
|
|||||||
"job.duration", "job.resources", "job.meta_data",
|
"job.duration", "job.resources", "job.meta_data",
|
||||||
}
|
}
|
||||||
|
|
||||||
const JobInsertStmt string = `INSERT INTO job (
|
|
||||||
job_id, user, project, cluster, partition, array_job_id, num_nodes, num_hwthreads, num_acc,
|
|
||||||
exclusive, monitoring_status, smt, job_state, start_time, duration, resources, meta_data,
|
|
||||||
mem_used_max, flops_any_avg, mem_bw_avg, load_avg, net_bw_avg, net_data_vol_total, file_bw_avg, file_data_vol_total
|
|
||||||
) VALUES (
|
|
||||||
:job_id, :user, :project, :cluster, :partition, :array_job_id, :num_nodes, :num_hwthreads, :num_acc,
|
|
||||||
:exclusive, :monitoring_status, :smt, :job_state, :start_time, :duration, :resources, :meta_data,
|
|
||||||
:mem_used_max, :flops_any_avg, :mem_bw_avg, :load_avg, :net_bw_avg, :net_data_vol_total, :file_bw_avg, :file_data_vol_total
|
|
||||||
);`
|
|
||||||
|
|
||||||
type Scannable interface {
|
type Scannable interface {
|
||||||
StructScan(dest interface{}) error
|
StructScan(dest interface{}) error
|
||||||
}
|
}
|
||||||
@ -85,7 +75,7 @@ type Scannable interface {
|
|||||||
// Helper function for scanning jobs with the `jobTableCols` columns selected.
|
// Helper function for scanning jobs with the `jobTableCols` columns selected.
|
||||||
func ScanJob(row Scannable) (*Job, error) {
|
func ScanJob(row Scannable) (*Job, error) {
|
||||||
job := &Job{BaseJob: JobDefaults}
|
job := &Job{BaseJob: JobDefaults}
|
||||||
if err := row.StructScan(&job); err != nil {
|
if err := row.StructScan(job); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -97,6 +87,7 @@ func ScanJob(row Scannable) (*Job, error) {
|
|||||||
job.Duration = int32(time.Since(job.StartTime).Seconds())
|
job.Duration = int32(time.Since(job.StartTime).Seconds())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
job.RawResources = nil
|
||||||
return job, nil
|
return job, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user