diff --git a/init-db.go b/init-db.go index 0c94fe4..502c5f5 100644 --- a/init-db.go +++ b/init-db.go @@ -36,8 +36,8 @@ const JOBS_DB_SCHEMA string = ` num_nodes INT NOT NULL, num_hwthreads INT NOT NULL, num_acc INT NOT NULL, - smt TINYINT CHECK(smt IN (0, 1 )) NOT NULL DEFAULT 1, - exclusive TINYINT CHECK(exclusive IN (0, 1, 2)) NOT NULL DEFAULT 1, + smt TINYINT CHECK(smt IN (0, 1 )) NOT NULL DEFAULT 1, + exclusive TINYINT CHECK(exclusive IN (0, 1, 2)) NOT NULL DEFAULT 1, monitoring_status TINYINT CHECK(monitoring_status IN (0, 1 )) NOT NULL DEFAULT 1, mem_used_max REAL NOT NULL DEFAULT 0.0, @@ -88,7 +88,15 @@ func initDB(db *sqlx.DB, archive string) error { return err } - stmt, err := tx.PrepareNamed(schema.JobInsertStmt) + stmt, err := tx.PrepareNamed(`INSERT INTO job ( + job_id, user, project, cluster, partition, array_job_id, num_nodes, num_hwthreads, num_acc, + exclusive, monitoring_status, smt, job_state, start_time, duration, resources, meta_data, + mem_used_max, flops_any_avg, mem_bw_avg, load_avg, net_bw_avg, net_data_vol_total, file_bw_avg, file_data_vol_total + ) VALUES ( + :job_id, :user, :project, :cluster, :partition, :array_job_id, :num_nodes, :num_hwthreads, :num_acc, + :exclusive, :monitoring_status, :smt, :job_state, :start_time, :duration, :resources, :meta_data, + :mem_used_max, :flops_any_avg, :mem_bw_avg, :load_avg, :net_bw_avg, :net_data_vol_total, :file_bw_avg, :file_data_vol_total + );`) if err != nil { return err } diff --git a/metricdata/archive.go b/metricdata/archive.go index 4ef6e6c..53f87b1 100644 --- a/metricdata/archive.go +++ b/metricdata/archive.go @@ -243,7 +243,7 @@ func calcStatisticsSeries(job *schema.Job, jobData schema.JobData) error { for i := 0; i < n; i++ { sum, smin, smax := schema.Float(0.), math.MaxFloat32, -math.MaxFloat32 for _, series := range jobMetric.Series { - if len(series.Data) >= i { + if i >= len(series.Data) { sum, smin, smax = schema.NaN, math.NaN(), math.NaN() break } @@ -258,9 +258,9 @@ func calcStatisticsSeries(job *schema.Job, jobData schema.JobData) error { max[i] = schema.Float(smax) } - jobMetric.StatisticsSeries.Mean = mean - jobMetric.StatisticsSeries.Min = min - jobMetric.StatisticsSeries.Max = max + jobMetric.StatisticsSeries = &schema.StatsSeries{ + Min: min, Mean: mean, Max: max, + } jobMetric.Series = nil } } diff --git a/metricdata/cc-metric-store.go b/metricdata/cc-metric-store.go index 62451c9..28a0069 100644 --- a/metricdata/cc-metric-store.go +++ b/metricdata/cc-metric-store.go @@ -7,6 +7,7 @@ import ( "encoding/json" "errors" "fmt" + "log" "net/http" "strconv" "time" @@ -81,6 +82,7 @@ func (ccms *CCMetricStore) doRequest(job *schema.Job, suffix string, metrics []s } func (ccms *CCMetricStore) LoadData(job *schema.Job, metrics []string, scopes []schema.MetricScope, ctx context.Context) (schema.JobData, error) { + // log.Printf("job: %#v", job) type ApiQuery struct { Metric string `json:"metric"` @@ -106,7 +108,7 @@ func (ccms *CCMetricStore) LoadData(job *schema.Job, metrics []string, scopes [] reqBody := ApiQueryRequest{ Cluster: job.Cluster, From: job.StartTime.Unix(), - To: job.StartTime.Add(time.Duration(job.Duration)).Unix(), + To: job.StartTime.Add(time.Duration(job.Duration) * time.Second).Unix(), Queries: make([]ApiQuery, 0), } @@ -118,12 +120,20 @@ func (ccms *CCMetricStore) LoadData(job *schema.Job, metrics []string, scopes [] scopeForMetric := map[string]schema.MetricScope{} for _, metric := range metrics { mc := config.GetMetricConfig(job.Cluster, metric) + if mc == nil { + // return nil, fmt.Errorf("metric '%s' is not specified for cluster '%s'", metric, job.Cluster) + log.Printf("metric '%s' is not specified for cluster '%s'", metric, job.Cluster) + continue + } + nativeScope, requestedScope := mc.Scope, scopes[0] // case 1: A metric is requested at node scope with a native scope of node as well // case 2: A metric is requested at node scope and node is exclusive + // case 3: A metric has native scope node if (nativeScope == requestedScope && nativeScope == schema.MetricScopeNode) || - (job.Exclusive == 1 && requestedScope == schema.MetricScopeNode) { + (job.Exclusive == 1 && requestedScope == schema.MetricScopeNode) || + (nativeScope == schema.MetricScopeNode) { nodes := map[string]bool{} for _, resource := range job.Resources { nodes[resource.Hostname] = true @@ -188,6 +198,8 @@ func (ccms *CCMetricStore) LoadData(job *schema.Job, metrics []string, scopes [] panic("todo") } + // log.Printf("query: %#v", reqBody) + buf := &bytes.Buffer{} if err := json.NewEncoder(buf).Encode(reqBody); err != nil { return nil, err @@ -213,9 +225,16 @@ func (ccms *CCMetricStore) LoadData(job *schema.Job, metrics []string, scopes [] return nil, err } + // log.Printf("response: %#v", resBody) + var jobData schema.JobData = make(schema.JobData) for _, res := range resBody { + metric := res.Query.Metric + if _, ok := jobData[metric]; !ok { + jobData[metric] = make(map[schema.MetricScope]*schema.JobMetric) + } + if res.Error != nil { return nil, fmt.Errorf("cc-metric-store error while fetching %s: %s", metric, *res.Error) } @@ -239,6 +258,14 @@ func (ccms *CCMetricStore) LoadData(job *schema.Job, metrics []string, scopes [] *id, _ = strconv.Atoi(res.Query.TypeIds[0]) } + if res.Avg.IsNaN() || res.Min.IsNaN() || res.Max.IsNaN() { + // TODO: use schema.Float instead of float64? + // This is done because regular float64 can not be JSONed when NaN. + res.Avg = schema.Float(0) + res.Min = schema.Float(0) + res.Max = schema.Float(0) + } + jobMetric.Series = append(jobMetric.Series, schema.Series{ Hostname: res.Query.Hostname, Id: id, diff --git a/metricdata/metricdata.go b/metricdata/metricdata.go index 875b242..25f4925 100644 --- a/metricdata/metricdata.go +++ b/metricdata/metricdata.go @@ -63,7 +63,13 @@ func LoadData(job *schema.Job, metrics []string, scopes []schema.MetricScope, ct return nil, fmt.Errorf("no metric data repository configured for '%s'", job.Cluster) } - return repo.LoadData(job, metrics, scopes, ctx) + data, err := repo.LoadData(job, metrics, scopes, ctx) + if err != nil { + return nil, err + } + + calcStatisticsSeries(job, data) + return data, nil } data, err := loadFromArchive(job) diff --git a/schema/job.go b/schema/job.go index d09fd67..8781776 100644 --- a/schema/job.go +++ b/schema/job.go @@ -11,7 +11,6 @@ import ( // Common subset of Job and JobMeta. Use one of those, not // this type directly. type BaseJob struct { - ID int64 `json:"id" db:"id"` JobID int64 `json:"jobId" db:"job_id"` User string `json:"user" db:"user"` Project string `json:"project" db:"project"` @@ -27,14 +26,15 @@ type BaseJob struct { State JobState `json:"jobState" db:"job_state"` Duration int32 `json:"duration" db:"duration"` Tags []*Tag `json:"tags"` + RawResources []byte `json:"-" db:"resources"` Resources []*Resource `json:"resources"` MetaData interface{} `json:"metaData" db:"meta_data"` } // This type is used as the GraphQL interface and using sqlx as a table row. type Job struct { + ID int64 `json:"id" db:"id"` BaseJob - RawResources []byte `json:"-" db:"resources"` StartTime time.Time `json:"startTime" db:"start_time"` MemUsedMax float64 `json:"-" db:"mem_used_max"` FlopsAnyAvg float64 `json:"-" db:"flops_any_avg"` @@ -52,7 +52,7 @@ type Job struct { // the StartTime field with one of type int64. type JobMeta struct { BaseJob - StartTime int64 `json:"startTime"` + StartTime int64 `json:"startTime" db:"start_time"` Statistics map[string]JobStatistics `json:"statistics,omitempty"` } @@ -68,16 +68,6 @@ var JobColumns []string = []string{ "job.duration", "job.resources", "job.meta_data", } -const JobInsertStmt string = `INSERT INTO job ( - job_id, user, project, cluster, partition, array_job_id, num_nodes, num_hwthreads, num_acc, - exclusive, monitoring_status, smt, job_state, start_time, duration, resources, meta_data, - mem_used_max, flops_any_avg, mem_bw_avg, load_avg, net_bw_avg, net_data_vol_total, file_bw_avg, file_data_vol_total -) VALUES ( - :job_id, :user, :project, :cluster, :partition, :array_job_id, :num_nodes, :num_hwthreads, :num_acc, - :exclusive, :monitoring_status, :smt, :job_state, :start_time, :duration, :resources, :meta_data, - :mem_used_max, :flops_any_avg, :mem_bw_avg, :load_avg, :net_bw_avg, :net_data_vol_total, :file_bw_avg, :file_data_vol_total -);` - type Scannable interface { StructScan(dest interface{}) error } @@ -85,7 +75,7 @@ type Scannable interface { // Helper function for scanning jobs with the `jobTableCols` columns selected. func ScanJob(row Scannable) (*Job, error) { job := &Job{BaseJob: JobDefaults} - if err := row.StructScan(&job); err != nil { + if err := row.StructScan(job); err != nil { return nil, err } @@ -97,6 +87,7 @@ func ScanJob(row Scannable) (*Job, error) { job.Duration = int32(time.Since(job.StartTime).Seconds()) } + job.RawResources = nil return job, nil }