Merge branch 'import-data-sanitation' into 48_improve_status_view: Unit pkg update required for dev

This commit is contained in:
Christoph Kluge 2022-10-05 11:01:43 +02:00
commit 74c50bd424
14 changed files with 319 additions and 55 deletions

View File

@ -17,6 +17,7 @@ import (
"github.com/ClusterCockpit/cc-backend/pkg/archive" "github.com/ClusterCockpit/cc-backend/pkg/archive"
"github.com/ClusterCockpit/cc-backend/pkg/log" "github.com/ClusterCockpit/cc-backend/pkg/log"
"github.com/ClusterCockpit/cc-backend/pkg/schema" "github.com/ClusterCockpit/cc-backend/pkg/schema"
"github.com/ClusterCockpit/cc-backend/pkg/units"
) )
// `AUTO_INCREMENT` is in a comment because of this hack: // `AUTO_INCREMENT` is in a comment because of this hack:
@ -103,11 +104,11 @@ func HandleImportFlag(flag string) error {
return err return err
} }
if config.Keys.Validate { // if config.Keys.Validate {
if err := schema.Validate(schema.Meta, bytes.NewReader(raw)); err != nil { if err := schema.Validate(schema.Meta, bytes.NewReader(raw)); err != nil {
return fmt.Errorf("validate job meta: %v", err) return fmt.Errorf("validate job meta: %v", err)
} }
} // }
dec := json.NewDecoder(bytes.NewReader(raw)) dec := json.NewDecoder(bytes.NewReader(raw))
dec.DisallowUnknownFields() dec.DisallowUnknownFields()
jobMeta := schema.JobMeta{BaseJob: schema.JobDefaults} jobMeta := schema.JobMeta{BaseJob: schema.JobDefaults}
@ -132,6 +133,7 @@ func HandleImportFlag(flag string) error {
return err return err
} }
checkJobData(&jobData)
SanityChecks(&jobMeta.BaseJob) SanityChecks(&jobMeta.BaseJob)
jobMeta.MonitoringStatus = schema.MonitoringStatusArchivingSuccessful jobMeta.MonitoringStatus = schema.MonitoringStatusArchivingSuccessful
if job, err := GetJobRepository().Find(&jobMeta.JobID, &jobMeta.Cluster, &jobMeta.StartTime); err != sql.ErrNoRows { if job, err := GetJobRepository().Find(&jobMeta.JobID, &jobMeta.Cluster, &jobMeta.StartTime); err != sql.ErrNoRows {
@ -366,3 +368,33 @@ func loadJobStat(job *schema.JobMeta, metric string) float64 {
return 0.0 return 0.0
} }
func checkJobData(d *schema.JobData) error {
for _, scopes := range *d {
var newUnit string
// Add node scope if missing
for _, metric := range scopes {
if strings.Contains(metric.Unit.Base, "B/s") ||
strings.Contains(metric.Unit.Base, "F/s") ||
strings.Contains(metric.Unit.Base, "B") {
// First get overall avg
sum := 0.0
for _, s := range metric.Series {
sum += s.Statistics.Avg
}
avg := sum / float64(len(metric.Series))
for _, s := range metric.Series {
fp := schema.ConvertFloatToFloat64(s.Data)
// Normalize values with new unit prefix
units.NormalizeSeries(fp, avg, metric.Unit, &newUnit)
s.Data = schema.GetFloat64ToFloat(fp)
}
metric.Unit = newUnit
}
}
}
return nil
}

View File

@ -211,7 +211,13 @@ func (r *JobRepository) Stop(
} }
// TODO: Use node hours instead: SELECT job.user, sum(job.num_nodes * (CASE WHEN job.job_state = "running" THEN CAST(strftime('%s', 'now') AS INTEGER) - job.start_time ELSE job.duration END)) as x FROM job GROUP BY user ORDER BY x DESC; // TODO: Use node hours instead: SELECT job.user, sum(job.num_nodes * (CASE WHEN job.job_state = "running" THEN CAST(strftime('%s', 'now') AS INTEGER) - job.start_time ELSE job.duration END)) as x FROM job GROUP BY user ORDER BY x DESC;
func (r *JobRepository) CountGroupedJobs(ctx context.Context, aggreg model.Aggregate, filters []*model.JobFilter, weight *model.Weights, limit *int) (map[string]int, error) { func (r *JobRepository) CountGroupedJobs(
ctx context.Context,
aggreg model.Aggregate,
filters []*model.JobFilter,
weight *model.Weights,
limit *int) (map[string]int, error) {
if !aggreg.IsValid() { if !aggreg.IsValid() {
return nil, errors.New("invalid aggregate") return nil, errors.New("invalid aggregate")
} }
@ -301,10 +307,14 @@ func (r *JobRepository) Archive(
var ErrNotFound = errors.New("no such job or user") var ErrNotFound = errors.New("no such job or user")
// FindJobOrUser returns a job database ID or a username if a job or user machtes the search term. // FindJobOrUser returns a job database ID or a username if a job or user
// As 0 is a valid job id, check if username is "" instead in order to check what machted. // machtes the search term. As 0 is a valid job id, check if username is ""
// If nothing matches the search, `ErrNotFound` is returned. // instead in order to check what matched. If nothing matches the search,
func (r *JobRepository) FindJobOrUser(ctx context.Context, searchterm string) (job int64, username string, err error) { // `ErrNotFound` is returned.
func (r *JobRepository) FindJobOrUser(
ctx context.Context,
searchterm string) (job int64, username string, err error) {
user := auth.GetUser(ctx) user := auth.GetUser(ctx)
if id, err := strconv.Atoi(searchterm); err == nil { if id, err := strconv.Atoi(searchterm); err == nil {
qb := sq.Select("job.id").From("job").Where("job.job_id = ?", id) qb := sq.Select("job.id").From("job").Where("job.job_id = ?", id)
@ -353,6 +363,7 @@ func (r *JobRepository) Partitions(cluster string) ([]string, error) {
// AllocatedNodes returns a map of all subclusters to a map of hostnames to the amount of jobs running on that host. // AllocatedNodes returns a map of all subclusters to a map of hostnames to the amount of jobs running on that host.
// Hosts with zero jobs running on them will not show up! // Hosts with zero jobs running on them will not show up!
func (r *JobRepository) AllocatedNodes(cluster string) (map[string]map[string]int, error) { func (r *JobRepository) AllocatedNodes(cluster string) (map[string]map[string]int, error) {
subclusters := make(map[string]map[string]int) subclusters := make(map[string]map[string]int)
rows, err := sq.Select("resources", "subcluster").From("job"). rows, err := sq.Select("resources", "subcluster").From("job").
Where("job.job_state = 'running'"). Where("job.job_state = 'running'").
@ -390,6 +401,7 @@ func (r *JobRepository) AllocatedNodes(cluster string) (map[string]map[string]in
} }
func (r *JobRepository) StopJobsExceedingWalltimeBy(seconds int) error { func (r *JobRepository) StopJobsExceedingWalltimeBy(seconds int) error {
res, err := sq.Update("job"). res, err := sq.Update("job").
Set("monitoring_status", schema.MonitoringStatusArchivingFailed). Set("monitoring_status", schema.MonitoringStatusArchivingFailed).
Set("duration", 0). Set("duration", 0).

View File

@ -45,7 +45,7 @@ type SubClusterConfig struct {
type MetricConfig struct { type MetricConfig struct {
Name string `json:"name"` Name string `json:"name"`
Unit string `json:"unit"` Unit Unit `json:"unit"`
Scope MetricScope `json:"scope"` Scope MetricScope `json:"scope"`
Aggregation *string `json:"aggregation"` Aggregation *string `json:"aggregation"`
Timestep int `json:"timestep"` Timestep int `json:"timestep"`

View File

@ -107,3 +107,23 @@ func (s *Series) MarshalJSON() ([]byte, error) {
buf = append(buf, ']', '}') buf = append(buf, ']', '}')
return buf, nil return buf, nil
} }
func ConvertFloatToFloat64(s []Float) []float64 {
fp := make([]float64, len(s))
for i, val := range s {
fp[i] = float64(val)
}
return fp
}
func GetFloat64ToFloat(s []float64) []Float {
fp := make([]Float, len(s))
for i, val := range s {
fp[i] = Float(val)
}
return fp
}

View File

@ -89,11 +89,15 @@ var JobDefaults BaseJob = BaseJob{
MonitoringStatus: MonitoringStatusRunningOrArchiving, MonitoringStatus: MonitoringStatusRunningOrArchiving,
} }
type Unit struct {
Base string `json:"base"`
Prefix string `json:"prefix"`
}
// JobStatistics model // JobStatistics model
// @Description Specification for job metric statistics. // @Description Specification for job metric statistics.
type JobStatistics struct { type JobStatistics struct {
// Metric unit (see schema/unit.schema.json) Unit Unit `json:"unit" example:"GHz"`
Unit string `json:"unit" example:"GHz"`
Avg float64 `json:"avg" example:"2500" minimum:"0"` // Job metric average Avg float64 `json:"avg" example:"2500" minimum:"0"` // Job metric average
Min float64 `json:"min" example:"2000" minimum:"0"` // Job metric minimum Min float64 `json:"min" example:"2000" minimum:"0"` // Job metric minimum
Max float64 `json:"max" example:"3000" minimum:"0"` // Job metric maximum Max float64 `json:"max" example:"3000" minimum:"0"` // Job metric maximum

View File

@ -15,7 +15,7 @@ import (
type JobData map[string]map[MetricScope]*JobMetric type JobData map[string]map[MetricScope]*JobMetric
type JobMetric struct { type JobMetric struct {
Unit string `json:"unit"` Unit Unit `json:"unit"`
Scope MetricScope `json:"scope"` Scope MetricScope `json:"scope"`
Timestep int `json:"timestep"` Timestep int `json:"timestep"`
Series []Series `json:"series"` Series []Series `json:"series"`

View File

@ -21,7 +21,7 @@
}, },
"unit": { "unit": {
"description": "Metric unit", "description": "Metric unit",
"type": "string" "$ref": "embedfs://unit.schema.json"
}, },
"scope": { "scope": {
"description": "Native measurement resolution", "description": "Native measurement resolution",
@ -38,7 +38,6 @@
"sum", "sum",
"avg" "avg"
] ]
}, },
"subClusters": { "subClusters": {
"description": "Array of cluster hardware partition metric thresholds", "description": "Array of cluster hardware partition metric thresholds",

View File

@ -349,7 +349,6 @@
"jobState", "jobState",
"duration", "duration",
"resources", "resources",
"tags",
"statistics" "statistics"
] ]
} }

View File

@ -5,7 +5,7 @@
"description": "Format specification for job metric units", "description": "Format specification for job metric units",
"type": "object", "type": "object",
"properties": { "properties": {
"base_unit": { "base": {
"description": "Metric base unit", "description": "Metric base unit",
"type": "string", "type": "string",
"enum": [ "enum": [
@ -36,6 +36,6 @@
} }
}, },
"required": [ "required": [
"base_unit" "base"
] ]
} }

View File

@ -1,6 +1,7 @@
# cc-units - A unit system for ClusterCockpit # cc-units - A unit system for ClusterCockpit
When working with metrics, the problem comes up that they may use different unit name but have the same unit in fact. There are a lot of real world examples like 'kB' and 'Kbyte'. In [cc-metric-collector](https://github.com/ClusterCockpit/cc-metric-collector), the collectors read data from different sources which may use different units or the programmer specifies a unit for a metric by hand. The cc-units system is not comparable with the SI unit system. If you are looking for a package for the SI units, see [here](https://pkg.go.dev/github.com/gurre/si). When working with metrics, the problem comes up that they may use different unit name but have the same unit in fact.
There are a lot of real world examples like 'kB' and 'Kbyte'. In [cc-metric-collector](https://github.com/ClusterCockpit/cc-metric-collector), the collectors read data from different sources which may use different units or the programmer specifies a unit for a metric by hand. The cc-units system is not comparable with the SI unit system. If you are looking for a package for the SI units, see [here](https://pkg.go.dev/github.com/gurre/si).
In order to enable unit comparison and conversion, the ccUnits package provides some helpers: In order to enable unit comparison and conversion, the ccUnits package provides some helpers:
```go ```go

View File

@ -1,6 +1,7 @@
package units package units
import ( import (
"math"
"regexp" "regexp"
) )
@ -172,3 +173,20 @@ func NewPrefix(prefix string) Prefix {
} }
return InvalidPrefix return InvalidPrefix
} }
func getExponent(p float64) int {
count := 0
for p > 1.0 {
p = p / 1000.0
count++
}
return count * 3
}
func NewPrefixFromFactor(op Prefix, e int) Prefix {
f := float64(op)
exp := math.Pow10(getExponent(f) - e)
return Prefix(exp)
}

View File

@ -3,6 +3,7 @@ package units
import ( import (
"fmt" "fmt"
"math"
"strings" "strings"
) )
@ -25,7 +26,9 @@ type Unit interface {
var INVALID_UNIT = NewUnit("foobar") var INVALID_UNIT = NewUnit("foobar")
// Valid checks whether a unit is a valid unit. A unit is valid if it has at least a prefix and a measure. The unit denominator is optional. // Valid checks whether a unit is a valid unit.
// A unit is valid if it has at least a prefix and a measure.
// The unit denominator is optional.
func (u *unit) Valid() bool { func (u *unit) Valid() bool {
return u.prefix != InvalidPrefix && u.measure != InvalidMeasure return u.prefix != InvalidPrefix && u.measure != InvalidMeasure
} }
@ -71,6 +74,63 @@ func (u *unit) getUnitDenominator() Measure {
return u.divMeasure return u.divMeasure
} }
func ConvertValue(v *float64, from string, to string) {
uf := NewUnit(from)
ut := NewUnit(to)
factor := float64(uf.getPrefix()) / float64(ut.getPrefix())
*v = math.Ceil(*v * factor)
}
func ConvertSeries(s []float64, from string, to string) {
uf := NewUnit(from)
ut := NewUnit(to)
factor := float64(uf.getPrefix()) / float64(ut.getPrefix())
for i := 0; i < len(s); i++ {
s[i] = math.Ceil(s[i] * factor)
}
}
func getNormalizationFactor(v float64) (float64, int) {
count := 0
scale := -3
if v > 1000.0 {
for v > 1000.0 {
v *= 1e-3
count++
}
} else {
for v < 1.0 {
v *= 1e3
count++
}
scale = 3
}
return math.Pow10(count * scale), count * scale
}
func NormalizeValue(v *float64, us string, nu *string) {
u := NewUnit(us)
f, e := getNormalizationFactor((*v))
*v = math.Ceil(*v * f)
u.setPrefix(NewPrefixFromFactor(u.getPrefix(), e))
*nu = u.Short()
}
func NormalizeSeries(s []float64, avg float64, us string, nu *string) {
u := NewUnit(us)
f, e := getNormalizationFactor(avg)
for i := 0; i < len(s); i++ {
s[i] *= f
s[i] = math.Ceil(s[i])
}
u.setPrefix(NewPrefixFromFactor(u.getPrefix(), e))
fmt.Printf("Prefix: %e \n", u.getPrefix())
*nu = u.Short()
}
// GetPrefixPrefixFactor creates the default conversion function between two prefixes. // GetPrefixPrefixFactor creates the default conversion function between two prefixes.
// It returns a conversation function for the value. // It returns a conversation function for the value.
func GetPrefixPrefixFactor(in Prefix, out Prefix) func(value interface{}) interface{} { func GetPrefixPrefixFactor(in Prefix, out Prefix) func(value interface{}) interface{} {

View File

@ -2,6 +2,7 @@ package units
import ( import (
"fmt" "fmt"
"reflect"
"regexp" "regexp"
"testing" "testing"
) )
@ -199,3 +200,108 @@ func TestPrefixRegex(t *testing.T) {
t.Logf("succussfully compiled regex '%s' for prefix %s", data.Regex, data.Long) t.Logf("succussfully compiled regex '%s' for prefix %s", data.Regex, data.Long)
} }
} }
func TestConvertValue(t *testing.T) {
v := float64(103456)
ConvertValue(&v, "MB/s", "GB/s")
if v != 104.00 {
t.Errorf("Failed ConvertValue: Want 103.456, Got %f", v)
}
}
func TestConvertValueUp(t *testing.T) {
v := float64(10.3456)
ConvertValue(&v, "GB/s", "MB/s")
if v != 10346.00 {
t.Errorf("Failed ConvertValue: Want 10346.00, Got %f", v)
}
}
func TestConvertSeries(t *testing.T) {
s := []float64{2890031237, 23998994567, 389734042344, 390349424345}
r := []float64{3, 24, 390, 391}
ConvertSeries(s, "F/s", "GF/s")
if !reflect.DeepEqual(s, r) {
t.Errorf("Failed ConvertValue: Want 3, 24, 390, 391, Got %v", s)
}
}
func TestNormalizeValue(t *testing.T) {
var s string
v := float64(103456)
NormalizeValue(&v, "MB/s", &s)
if v != 104.00 {
t.Errorf("Failed ConvertValue: Want 104.00, Got %f", v)
}
if s != "GB/s" {
t.Errorf("Failed Prefix or unit: Want GB/s, Got %s", s)
}
}
func TestNormalizeValueNoPrefix(t *testing.T) {
var s string
v := float64(103458596)
NormalizeValue(&v, "F/s", &s)
if v != 104.00 {
t.Errorf("Failed ConvertValue: Want 104.00, Got %f", v)
}
if s != "MFlops/s" {
t.Errorf("Failed Prefix or unit: Want GB/s, Got %s", s)
}
}
func TestNormalizeValueKeep(t *testing.T) {
var s string
v := float64(345)
NormalizeValue(&v, "MB/s", &s)
if v != 345.00 {
t.Errorf("Failed ConvertValue: Want 104.00, Got %f", v)
}
if s != "MB/s" {
t.Errorf("Failed Prefix or unit: Want GB/s, Got %s", s)
}
}
func TestNormalizeValueDown(t *testing.T) {
var s string
v := float64(0.0004578)
NormalizeValue(&v, "GB/s", &s)
if v != 458.00 {
t.Errorf("Failed ConvertValue: Want 458.00, Got %f", v)
}
if s != "KB/s" {
t.Errorf("Failed Prefix or unit: Want KB/s, Got %s", s)
}
}
func TestNormalizeSeries(t *testing.T) {
var us string
s := []float64{2890031237, 23998994567, 389734042344, 390349424345}
r := []float64{3, 24, 390, 391}
total := 0.0
for _, number := range s {
total += number
}
avg := total / float64(len(s))
fmt.Printf("AVG: %e\n", avg)
NormalizeSeries(s, avg, "KB/s", &us)
if !reflect.DeepEqual(s, r) {
t.Errorf("Failed ConvertValue: Want 3, 24, 390, 391, Got %v", s)
}
if us != "TB/s" {
t.Errorf("Failed Prefix or unit: Want TB/s, Got %s", us)
}
}

View File

@ -602,4 +602,17 @@ func testImportFlag(t *testing.T) {
if len(data) != 8 { if len(data) != 8 {
t.Errorf("Job data length: Got %d, want 8", len(data)) t.Errorf("Job data length: Got %d, want 8", len(data))
} }
r := map[string]string{"mem_used": "GB", "net_bw": "KB/s",
"cpu_power": "W", "cpu_used": "cpu_used",
"file_bw": "KB/s", "flops_any": "Flops/s",
"mem_bw": "GB/s", "ipc": "IPC"}
for name, scopes := range data {
for _, metric := range scopes {
if metric.Unit != r[name] {
t.Errorf("Metric %s unit: Got %s, want %s", name, metric.Unit, r[name])
}
}
}
} }