Serve job metric data via GraphQL

This commit is contained in:
Lou Knauer 2021-04-07 09:19:21 +02:00
parent 4273878e9b
commit a52445086b
6 changed files with 2164 additions and 5 deletions

3
.gitignore vendored Normal file
View File

@ -0,0 +1,3 @@
job.db
job-data
cc-jobarchive

View File

@ -1,6 +1,7 @@
# Run server # Run server
* The server expects the SQLite Job database in `job.db`. * The server expects the SQLite Job database in `job.db`.
* The metric data as JSON is expected in `job-data/.../.../{data.json|meta.json}`
* Run ```go run server.go``` * Run ```go run server.go```
* The GraphQL backend is located at http://localhost:8080/query/ . * The GraphQL backend is located at http://localhost:8080/query/ .
@ -34,7 +35,7 @@ Using the Query variables:
"filter": { "list": [ "filter": { "list": [
{"userId": {"contains": "unrz"}}, {"userId": {"contains": "unrz"}},
{"duration": {"from": 60, "to": 1000}}, {"duration": {"from": 60, "to": 1000}},
{"startTime": {"from": "2019-06-01T00:00:00.00Z", "to": "2019-10-01T00:00:00.00Z"}}]}, {"startTime": {"from": "2019-06-01T00:00:00.00Z", "to": "2019-10-01T00:00:00.00Z"}}]},
"sorting": { "field": "start_time", "order": "ASC" }, "sorting": { "field": "start_time", "order": "ASC" },
"paging": { "itemsPerPage": 20, "page": 1 } "paging": { "itemsPerPage": 20, "page": 1 }
} }
@ -44,4 +45,4 @@ Using the Query variables:
* Edit ```./graph/schema.graphqls``` * Edit ```./graph/schema.graphqls```
* Regenerate code: ```gqlgen generate``` * Regenerate code: ```gqlgen generate```
* Implement callbacks in ```graph/schema.resolvers.go``` * Implement callbacks in ```graph/resolvers.go```

File diff suppressed because it is too large Load Diff

View File

@ -34,6 +34,28 @@ type IntRange struct {
To int `json:"to"` To int `json:"to"`
} }
type JobData struct {
LoadOne *JobMetric `json:"load_one"`
MemUsed *JobMetric `json:"mem_used"`
MemBw *JobMetric `json:"mem_bw"`
FlopsAny *JobMetric `json:"flops_any"`
FlopsDp *JobMetric `json:"flops_dp"`
FlopsSp *JobMetric `json:"flops_sp"`
CpiAvg *JobMetric `json:"cpi_avg"`
ClockSpeed *JobMetric `json:"clock_speed"`
TotalPower *JobMetric `json:"total_power"`
TrafficReadEth *JobMetric `json:"traffic_read_eth"`
TrafficWriteEth *JobMetric `json:"traffic_write_eth"`
TrafficReadLustre *JobMetric `json:"traffic_read_lustre"`
TrafficWriteLustre *JobMetric `json:"traffic_write_lustre"`
RegReadLustre *JobMetric `json:"reg_read_lustre"`
RegWriteLustre *JobMetric `json:"reg_write_lustre"`
InodesLustre *JobMetric `json:"inodes_lustre"`
PkgRateReadIb *JobMetric `json:"pkg_rate_read_ib"`
PkgRateWriteIb *JobMetric `json:"pkg_rate_write_ib"`
CongestionIb *JobMetric `json:"congestion_ib"`
}
type JobFilter struct { type JobFilter struct {
JobID *StringInput `json:"jobId"` JobID *StringInput `json:"jobId"`
UserID *StringInput `json:"userId"` UserID *StringInput `json:"userId"`
@ -49,6 +71,30 @@ type JobFilterList struct {
List []*JobFilter `json:"list"` List []*JobFilter `json:"list"`
} }
type JobMetric struct {
Unit string `json:"unit"`
Scope JobMetricScope `json:"scope"`
Timestep int `json:"timestep"`
Series []*JobMetricSeries `json:"series"`
}
type JobMetricSeries struct {
NodeID string `json:"node_id"`
Statistics *JobMetricStatistics `json:"statistics"`
Data []*float64 `json:"data"`
}
type JobMetricStatistics struct {
Avg float64 `json:"avg"`
Min float64 `json:"min"`
Max float64 `json:"max"`
}
type JobMetricWithName struct {
Name string `json:"name"`
Metric *JobMetric `json:"metric"`
}
type JobResultList struct { type JobResultList struct {
Items []*Job `json:"items"` Items []*Job `json:"items"`
Offset *int `json:"offset"` Offset *int `json:"offset"`
@ -100,6 +146,49 @@ type TimeRange struct {
To time.Time `json:"to"` To time.Time `json:"to"`
} }
type JobMetricScope string
const (
JobMetricScopeNode JobMetricScope = "node"
JobMetricScopeCPU JobMetricScope = "cpu"
JobMetricScopeSocket JobMetricScope = "socket"
)
var AllJobMetricScope = []JobMetricScope{
JobMetricScopeNode,
JobMetricScopeCPU,
JobMetricScopeSocket,
}
func (e JobMetricScope) IsValid() bool {
switch e {
case JobMetricScopeNode, JobMetricScopeCPU, JobMetricScopeSocket:
return true
}
return false
}
func (e JobMetricScope) String() string {
return string(e)
}
func (e *JobMetricScope) UnmarshalGQL(v interface{}) error {
str, ok := v.(string)
if !ok {
return fmt.Errorf("enums must be strings")
}
*e = JobMetricScope(str)
if !e.IsValid() {
return fmt.Errorf("%s is not a valid JobMetricScope", str)
}
return nil
}
func (e JobMetricScope) MarshalGQL(w io.Writer) {
fmt.Fprint(w, strconv.Quote(e.String()))
}
type SortDirectionEnum string type SortDirectionEnum string
const ( const (

View File

@ -6,6 +6,9 @@ import (
"fmt" "fmt"
"log" "log"
"strings" "strings"
"os"
"strconv"
"encoding/json"
"github.com/ClusterCockpit/cc-jobarchive/graph/generated" "github.com/ClusterCockpit/cc-jobarchive/graph/generated"
"github.com/ClusterCockpit/cc-jobarchive/graph/model" "github.com/ClusterCockpit/cc-jobarchive/graph/model"
@ -243,6 +246,128 @@ func (r *queryResolver) JobsStatistics(
return &stats, nil return &stats, nil
} }
func (r *queryResolver) JobDataByID(
ctx context.Context, jobId string) (*model.JobData, error) {
// TODO: What to do with the suffix?
jobId = strings.Split(jobId, ".")[0]
id, err := strconv.Atoi(jobId)
if err != nil {
return nil, err
}
lvl1, lvl2 := id / 1000, id % 1000
filepath := fmt.Sprintf("./job-data/%d/%03d/data.json", lvl1, lvl2)
f, err := os.ReadFile(filepath)
if err != nil {
return nil, err
}
jobData := new(model.JobData)
err = json.Unmarshal(f, jobData)
if err != nil {
return nil, err
}
return jobData, nil
}
func (r *queryResolver) JobAvailableMetricsByID(
ctx context.Context, jobId string) ([]*model.JobMetricWithName, error) {
jobData, err := r.JobDataByID(ctx, jobId)
if err != nil {
return nil, err
}
var list []*model.JobMetricWithName
/*
* GraphQL has no Map-Type, so
* this is the best i could come up with.
* This is only for testing anyways?
*/
if jobData.LoadOne != nil {
list = append(list, &model.JobMetricWithName {
"load_one", jobData.LoadOne })
}
if jobData.MemUsed != nil {
list = append(list, &model.JobMetricWithName {
"mem_used", jobData.MemUsed })
}
if jobData.MemBw != nil {
list = append(list, &model.JobMetricWithName {
"mem_bw", jobData.MemBw })
}
if jobData.FlopsAny != nil {
list = append(list, &model.JobMetricWithName {
"flops_any", jobData.FlopsAny })
}
if jobData.FlopsDp != nil {
list = append(list, &model.JobMetricWithName {
"flops_dp", jobData.FlopsDp })
}
if jobData.FlopsSp != nil {
list = append(list, &model.JobMetricWithName {
"flops_sp", jobData.FlopsSp })
}
if jobData.CpiAvg != nil {
list = append(list, &model.JobMetricWithName {
"cpi_avg", jobData.CpiAvg })
}
if jobData.ClockSpeed != nil {
list = append(list, &model.JobMetricWithName {
"clock_speed", jobData.ClockSpeed })
}
if jobData.TotalPower != nil {
list = append(list, &model.JobMetricWithName {
"total_power", jobData.TotalPower })
}
if jobData.TrafficReadEth != nil {
list = append(list, &model.JobMetricWithName {
"traffic_read_eth", jobData.TrafficReadEth })
}
if jobData.TrafficWriteEth != nil {
list = append(list, &model.JobMetricWithName {
"traffic_write_eth", jobData.TrafficWriteEth })
}
if jobData.TrafficReadLustre != nil {
list = append(list, &model.JobMetricWithName {
"traffic_read_lustre", jobData.TrafficReadLustre })
}
if jobData.TrafficWriteLustre != nil {
list = append(list, &model.JobMetricWithName {
"traffic_write_lustre", jobData.TrafficWriteLustre })
}
if jobData.RegReadLustre != nil {
list = append(list, &model.JobMetricWithName {
"reg_read_lustre", jobData.RegReadLustre })
}
if jobData.RegWriteLustre != nil {
list = append(list, &model.JobMetricWithName {
"reg_write_lustre", jobData.RegWriteLustre })
}
if jobData.InodesLustre != nil {
list = append(list, &model.JobMetricWithName {
"inodes_lustre", jobData.InodesLustre })
}
if jobData.PkgRateReadIb != nil {
list = append(list, &model.JobMetricWithName {
"pkg_rate_read_ib", jobData.PkgRateReadIb })
}
if jobData.PkgRateWriteIb != nil {
list = append(list, &model.JobMetricWithName {
"pkg_rate_write_ib", jobData.PkgRateWriteIb })
}
if jobData.CongestionIb != nil {
list = append(list, &model.JobMetricWithName {
"congestion_ib", jobData.CongestionIb })
}
return list, nil
}
func (r *Resolver) Query() generated.QueryResolver { return &queryResolver{r} } func (r *Resolver) Query() generated.QueryResolver { return &queryResolver{r} }
type queryResolver struct{ *Resolver } type queryResolver struct{ *Resolver }

View File

@ -9,10 +9,60 @@ type Job {
numNodes: Int! numNodes: Int!
} }
type JobData {
load_one: JobMetric
mem_used: JobMetric
mem_bw: JobMetric
flops_any: JobMetric
flops_dp: JobMetric
flops_sp: JobMetric
cpi_avg: JobMetric
clock_speed: JobMetric
total_power: JobMetric
traffic_read_eth: JobMetric
traffic_write_eth: JobMetric
traffic_read_lustre: JobMetric
traffic_write_lustre: JobMetric
reg_read_lustre: JobMetric
reg_write_lustre: JobMetric
inodes_lustre: JobMetric
pkg_rate_read_ib: JobMetric
pkg_rate_write_ib: JobMetric
congestion_ib: JobMetric
}
type JobMetric {
unit: String!
scope: JobMetricScope!
timestep: Int!
series: [JobMetricSeries]!
}
enum JobMetricScope {
node
cpu
socket
}
type JobMetricSeries {
node_id: String!
statistics: JobMetricStatistics
data: [Float]!
}
type JobMetricStatistics {
avg: Float!
min: Float!
max: Float!
}
type Query { type Query {
jobById(jobId: String!): Job jobById(jobId: String!): Job
jobs(filter: JobFilterList, page: PageRequest, order: OrderByInput): JobResultList! jobs(filter: JobFilterList, page: PageRequest, order: OrderByInput): JobResultList!
jobsStatistics(filter: JobFilterList): JobsStatistics! jobsStatistics(filter: JobFilterList): JobsStatistics!
jobDataById(jobId: String!): JobData
jobAvailableMetricsById(jobId: String!): [JobMetricWithName]!
} }
input StartJobInput { input StartJobInput {
@ -93,6 +143,11 @@ type JobResultList {
count: Int count: Int
} }
type JobMetricWithName {
name: String!
metric: JobMetric!
}
type HistoPoint { type HistoPoint {
count: Int! count: Int!
value: Int! value: Int!