mirror of
https://github.com/ClusterCockpit/cc-backend
synced 2024-12-25 21:09:05 +01:00
Fix issues in conversion tool
This commit is contained in:
parent
70b39730d2
commit
8dc9e6ecdd
@ -39,7 +39,7 @@ var MeasuresMap map[Measure]MeasureData = map[Measure]MeasureData{
|
||||
},
|
||||
Flops: {
|
||||
Long: "Flops",
|
||||
Short: "Flops",
|
||||
Short: "F",
|
||||
Regex: "^([fF][lL]?[oO]?[pP]?[sS]?)",
|
||||
},
|
||||
Percentage: {
|
||||
|
@ -5,6 +5,8 @@ import (
|
||||
"fmt"
|
||||
"math"
|
||||
"strings"
|
||||
|
||||
"github.com/ClusterCockpit/cc-backend/pkg/schema"
|
||||
)
|
||||
|
||||
type unit struct {
|
||||
@ -131,6 +133,32 @@ func NormalizeSeries(s []float64, avg float64, us string, nu *string) {
|
||||
*nu = u.Short()
|
||||
}
|
||||
|
||||
func ConvertUnitString(us string) schema.Unit {
|
||||
var nu schema.Unit
|
||||
|
||||
if us == "CPI" ||
|
||||
us == "IPC" ||
|
||||
us == "load" ||
|
||||
us == "" {
|
||||
nu.Base = us
|
||||
return nu
|
||||
}
|
||||
u := NewUnit(us)
|
||||
p := u.getPrefix()
|
||||
if p.Prefix() != "" {
|
||||
nu.Prefix = p.Prefix()
|
||||
}
|
||||
m := u.getMeasure()
|
||||
d := u.getUnitDenominator()
|
||||
if d.Short() != "inval" {
|
||||
nu.Base = fmt.Sprintf("%s/%s", m.Short(), d.Short())
|
||||
} else {
|
||||
nu.Base = m.Short()
|
||||
}
|
||||
|
||||
return nu
|
||||
}
|
||||
|
||||
// GetPrefixPrefixFactor creates the default conversion function between two prefixes.
|
||||
// It returns a conversation function for the value.
|
||||
func GetPrefixPrefixFactor(in Prefix, out Prefix) func(value interface{}) interface{} {
|
||||
|
@ -4,171 +4,62 @@
|
||||
// license that can be found in the LICENSE file.
|
||||
package main
|
||||
|
||||
import "strconv"
|
||||
import (
|
||||
"github.com/ClusterCockpit/cc-backend/pkg/schema"
|
||||
)
|
||||
|
||||
type Accelerator struct {
|
||||
ID string `json:"id"`
|
||||
Type string `json:"type"`
|
||||
Model string `json:"model"`
|
||||
}
|
||||
// type Accelerator struct {
|
||||
// ID string `json:"id"`
|
||||
// Type string `json:"type"`
|
||||
// Model string `json:"model"`
|
||||
// }
|
||||
|
||||
type Topology struct {
|
||||
Node []int `json:"node"`
|
||||
Socket [][]int `json:"socket"`
|
||||
MemoryDomain [][]int `json:"memoryDomain"`
|
||||
Die [][]int `json:"die"`
|
||||
Core [][]int `json:"core"`
|
||||
Accelerators []*Accelerator `json:"accelerators"`
|
||||
}
|
||||
// type Topology struct {
|
||||
// Node []int `json:"node"`
|
||||
// Socket [][]int `json:"socket"`
|
||||
// MemoryDomain [][]int `json:"memoryDomain"`
|
||||
// Die [][]int `json:"die"`
|
||||
// Core [][]int `json:"core"`
|
||||
// Accelerators []*Accelerator `json:"accelerators"`
|
||||
// }
|
||||
|
||||
type SubCluster struct {
|
||||
Name string `json:"name"`
|
||||
Nodes string `json:"nodes"`
|
||||
NumberOfNodes int `json:"numberOfNodes"`
|
||||
ProcessorType string `json:"processorType"`
|
||||
SocketsPerNode int `json:"socketsPerNode"`
|
||||
CoresPerSocket int `json:"coresPerSocket"`
|
||||
ThreadsPerCore int `json:"threadsPerCore"`
|
||||
FlopRateScalar int `json:"flopRateScalar"`
|
||||
FlopRateSimd int `json:"flopRateSimd"`
|
||||
MemoryBandwidth int `json:"memoryBandwidth"`
|
||||
Topology *Topology `json:"topology"`
|
||||
}
|
||||
// type SubCluster struct {
|
||||
// Name string `json:"name"`
|
||||
// Nodes string `json:"nodes"`
|
||||
// NumberOfNodes int `json:"numberOfNodes"`
|
||||
// ProcessorType string `json:"processorType"`
|
||||
// SocketsPerNode int `json:"socketsPerNode"`
|
||||
// CoresPerSocket int `json:"coresPerSocket"`
|
||||
// ThreadsPerCore int `json:"threadsPerCore"`
|
||||
// FlopRateScalar int `json:"flopRateScalar"`
|
||||
// FlopRateSimd int `json:"flopRateSimd"`
|
||||
// MemoryBandwidth int `json:"memoryBandwidth"`
|
||||
// Topology *Topology `json:"topology"`
|
||||
// }
|
||||
|
||||
type SubClusterConfig struct {
|
||||
Name string `json:"name"`
|
||||
Peak float64 `json:"peak"`
|
||||
Normal float64 `json:"normal"`
|
||||
Caution float64 `json:"caution"`
|
||||
Alert float64 `json:"alert"`
|
||||
}
|
||||
// type SubClusterConfig struct {
|
||||
// Name string `json:"name"`
|
||||
// Peak float64 `json:"peak"`
|
||||
// Normal float64 `json:"normal"`
|
||||
// Caution float64 `json:"caution"`
|
||||
// Alert float64 `json:"alert"`
|
||||
// }
|
||||
|
||||
type MetricConfig struct {
|
||||
Name string `json:"name"`
|
||||
Unit string `json:"unit"`
|
||||
Scope MetricScope `json:"scope"`
|
||||
Scope schema.MetricScope `json:"scope"`
|
||||
Aggregation *string `json:"aggregation"`
|
||||
Timestep int `json:"timestep"`
|
||||
Peak *float64 `json:"peak"`
|
||||
Normal *float64 `json:"normal"`
|
||||
Caution *float64 `json:"caution"`
|
||||
Alert *float64 `json:"alert"`
|
||||
SubClusters []*SubClusterConfig `json:"subClusters"`
|
||||
SubClusters []*schema.SubClusterConfig `json:"subClusters"`
|
||||
}
|
||||
|
||||
type Cluster struct {
|
||||
Name string `json:"name"`
|
||||
MetricConfig []*MetricConfig `json:"metricConfig"`
|
||||
SubClusters []*SubCluster `json:"subClusters"`
|
||||
}
|
||||
|
||||
// Return a list of socket IDs given a list of hwthread IDs. Even if just one
|
||||
// hwthread is in that socket, add it to the list. If no hwthreads other than
|
||||
// those in the argument list are assigned to one of the sockets in the first
|
||||
// return value, return true as the second value. TODO: Optimize this, there
|
||||
// must be a more efficient way/algorithm.
|
||||
func (topo *Topology) GetSocketsFromHWThreads(
|
||||
hwthreads []int) (sockets []int, exclusive bool) {
|
||||
|
||||
socketsMap := map[int]int{}
|
||||
for _, hwthread := range hwthreads {
|
||||
for socket, hwthreadsInSocket := range topo.Socket {
|
||||
for _, hwthreadInSocket := range hwthreadsInSocket {
|
||||
if hwthread == hwthreadInSocket {
|
||||
socketsMap[socket] += 1
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
exclusive = true
|
||||
hwthreadsPerSocket := len(topo.Node) / len(topo.Socket)
|
||||
sockets = make([]int, 0, len(socketsMap))
|
||||
for socket, count := range socketsMap {
|
||||
sockets = append(sockets, socket)
|
||||
exclusive = exclusive && count == hwthreadsPerSocket
|
||||
}
|
||||
|
||||
return sockets, exclusive
|
||||
}
|
||||
|
||||
// Return a list of core IDs given a list of hwthread IDs. Even if just one
|
||||
// hwthread is in that core, add it to the list. If no hwthreads other than
|
||||
// those in the argument list are assigned to one of the cores in the first
|
||||
// return value, return true as the second value. TODO: Optimize this, there
|
||||
// must be a more efficient way/algorithm.
|
||||
func (topo *Topology) GetCoresFromHWThreads(
|
||||
hwthreads []int) (cores []int, exclusive bool) {
|
||||
|
||||
coresMap := map[int]int{}
|
||||
for _, hwthread := range hwthreads {
|
||||
for core, hwthreadsInCore := range topo.Core {
|
||||
for _, hwthreadInCore := range hwthreadsInCore {
|
||||
if hwthread == hwthreadInCore {
|
||||
coresMap[core] += 1
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
exclusive = true
|
||||
hwthreadsPerCore := len(topo.Node) / len(topo.Core)
|
||||
cores = make([]int, 0, len(coresMap))
|
||||
for core, count := range coresMap {
|
||||
cores = append(cores, core)
|
||||
exclusive = exclusive && count == hwthreadsPerCore
|
||||
}
|
||||
|
||||
return cores, exclusive
|
||||
}
|
||||
|
||||
// Return a list of memory domain IDs given a list of hwthread IDs. Even if
|
||||
// just one hwthread is in that memory domain, add it to the list. If no
|
||||
// hwthreads other than those in the argument list are assigned to one of the
|
||||
// memory domains in the first return value, return true as the second value.
|
||||
// TODO: Optimize this, there must be a more efficient way/algorithm.
|
||||
func (topo *Topology) GetMemoryDomainsFromHWThreads(
|
||||
hwthreads []int) (memDoms []int, exclusive bool) {
|
||||
|
||||
memDomsMap := map[int]int{}
|
||||
for _, hwthread := range hwthreads {
|
||||
for memDom, hwthreadsInmemDom := range topo.MemoryDomain {
|
||||
for _, hwthreadInmemDom := range hwthreadsInmemDom {
|
||||
if hwthread == hwthreadInmemDom {
|
||||
memDomsMap[memDom] += 1
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
exclusive = true
|
||||
hwthreadsPermemDom := len(topo.Node) / len(topo.MemoryDomain)
|
||||
memDoms = make([]int, 0, len(memDomsMap))
|
||||
for memDom, count := range memDomsMap {
|
||||
memDoms = append(memDoms, memDom)
|
||||
exclusive = exclusive && count == hwthreadsPermemDom
|
||||
}
|
||||
|
||||
return memDoms, exclusive
|
||||
}
|
||||
|
||||
func (topo *Topology) GetAcceleratorIDs() ([]int, error) {
|
||||
accels := make([]int, 0)
|
||||
for _, accel := range topo.Accelerators {
|
||||
id, err := strconv.Atoi(accel.ID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
accels = append(accels, id)
|
||||
}
|
||||
return accels, nil
|
||||
}
|
||||
|
||||
func (topo *Topology) GetAcceleratorIndex(id string) (int, bool) {
|
||||
for idx, accel := range topo.Accelerators {
|
||||
if accel.ID == id {
|
||||
return idx, true
|
||||
}
|
||||
}
|
||||
return -1, false
|
||||
SubClusters []*schema.SubCluster `json:"subClusters"`
|
||||
}
|
||||
|
@ -9,6 +9,7 @@ import (
|
||||
"fmt"
|
||||
|
||||
"github.com/ClusterCockpit/cc-backend/pkg/archive"
|
||||
"github.com/ClusterCockpit/cc-backend/pkg/schema"
|
||||
)
|
||||
|
||||
var Clusters []*Cluster
|
||||
@ -42,7 +43,7 @@ func initClusterConfig() error {
|
||||
|
||||
// For backwards compability...
|
||||
if mc.Scope == "" {
|
||||
mc.Scope = MetricScopeNode
|
||||
mc.Scope = schema.MetricScopeNode
|
||||
}
|
||||
if !mc.Scope.Valid() {
|
||||
return errors.New("cluster.metricConfig.scope must be a valid scope ('node', 'scocket', ...)")
|
||||
@ -78,7 +79,7 @@ func GetCluster(cluster string) *Cluster {
|
||||
return nil
|
||||
}
|
||||
|
||||
func GetSubCluster(cluster, subcluster string) *SubCluster {
|
||||
func GetSubCluster(cluster, subcluster string) *schema.SubCluster {
|
||||
|
||||
for _, c := range Clusters {
|
||||
if c.Name == cluster {
|
||||
|
@ -7,6 +7,8 @@ package main
|
||||
import (
|
||||
"encoding/json"
|
||||
"io"
|
||||
|
||||
"github.com/ClusterCockpit/cc-backend/pkg/schema"
|
||||
)
|
||||
|
||||
func DecodeJobData(r io.Reader, k string) (JobData, error) {
|
||||
@ -57,3 +59,12 @@ func EncodeJobMeta(w io.Writer, d *JobMeta) error {
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func EncodeCluster(w io.Writer, c *schema.Cluster) error {
|
||||
// Sanitize parameters
|
||||
if err := json.NewEncoder(w).Encode(c); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
@ -9,10 +9,37 @@ import (
|
||||
"flag"
|
||||
"fmt"
|
||||
"log"
|
||||
"os"
|
||||
|
||||
"github.com/ClusterCockpit/cc-backend/pkg/schema"
|
||||
"github.com/ClusterCockpit/cc-backend/pkg/units"
|
||||
)
|
||||
|
||||
var ar FsArchive
|
||||
|
||||
func deepCopyClusterConfig(co *Cluster) schema.Cluster {
|
||||
var cn schema.Cluster
|
||||
|
||||
cn.Name = co.Name
|
||||
cn.SubClusters = co.SubClusters
|
||||
|
||||
for _, mco := range co.MetricConfig {
|
||||
var mcn schema.MetricConfig
|
||||
mcn.Name = mco.Name
|
||||
mcn.Scope = mco.Scope
|
||||
mcn.Aggregation = mco.Aggregation
|
||||
mcn.Timestep = mco.Timestep
|
||||
mcn.Peak = mco.Peak
|
||||
mcn.Normal = mco.Normal
|
||||
mcn.Caution = mco.Caution
|
||||
mcn.Alert = mco.Alert
|
||||
mcn.Unit = units.ConvertUnitString(mco.Unit)
|
||||
cn.MetricConfig = append(cn.MetricConfig, &mcn)
|
||||
}
|
||||
|
||||
return cn
|
||||
}
|
||||
|
||||
func main() {
|
||||
var srcPath string
|
||||
var dstPath string
|
||||
@ -26,7 +53,38 @@ func main() {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
for job := range ar.Iter() {
|
||||
fmt.Printf("Job %d\n", job.JobID)
|
||||
err = initClusterConfig()
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
// setup new job archive
|
||||
err = os.Mkdir(dstPath, 0750)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
for _, c := range Clusters {
|
||||
path := fmt.Sprintf("%s/%s", dstPath, c.Name)
|
||||
fmt.Println(path)
|
||||
err = os.Mkdir(path, 0750)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
cn := deepCopyClusterConfig(c)
|
||||
|
||||
f, err := os.Create(fmt.Sprintf("%s/%s/cluster.json", dstPath, c.Name))
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
if err := EncodeCluster(f, &cn); err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
if err := f.Close(); err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
// for job := range ar.Iter() {
|
||||
// fmt.Printf("Job %d\n", job.JobID)
|
||||
// }
|
||||
}
|
||||
|
@ -5,18 +5,14 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
"math"
|
||||
"sort"
|
||||
"unsafe"
|
||||
"github.com/ClusterCockpit/cc-backend/pkg/schema"
|
||||
)
|
||||
|
||||
type JobData map[string]map[MetricScope]*JobMetric
|
||||
type JobData map[string]map[schema.MetricScope]*JobMetric
|
||||
|
||||
type JobMetric struct {
|
||||
Unit string `json:"unit"`
|
||||
Scope MetricScope `json:"scope"`
|
||||
Scope schema.MetricScope `json:"scope"`
|
||||
Timestep int `json:"timestep"`
|
||||
Series []Series `json:"series"`
|
||||
StatisticsSeries *StatsSeries `json:"statisticsSeries"`
|
||||
@ -42,282 +38,28 @@ type StatsSeries struct {
|
||||
Percentiles map[int][]Float `json:"percentiles,omitempty"`
|
||||
}
|
||||
|
||||
type MetricScope string
|
||||
// type MetricScope string
|
||||
|
||||
const (
|
||||
MetricScopeInvalid MetricScope = "invalid_scope"
|
||||
// const (
|
||||
// MetricScopeInvalid MetricScope = "invalid_scope"
|
||||
|
||||
MetricScopeNode MetricScope = "node"
|
||||
MetricScopeSocket MetricScope = "socket"
|
||||
MetricScopeMemoryDomain MetricScope = "memoryDomain"
|
||||
MetricScopeCore MetricScope = "core"
|
||||
MetricScopeHWThread MetricScope = "hwthread"
|
||||
// MetricScopeNode MetricScope = "node"
|
||||
// MetricScopeSocket MetricScope = "socket"
|
||||
// MetricScopeMemoryDomain MetricScope = "memoryDomain"
|
||||
// MetricScopeCore MetricScope = "core"
|
||||
// MetricScopeHWThread MetricScope = "hwthread"
|
||||
|
||||
MetricScopeAccelerator MetricScope = "accelerator"
|
||||
)
|
||||
// MetricScopeAccelerator MetricScope = "accelerator"
|
||||
// )
|
||||
|
||||
var metricScopeGranularity map[MetricScope]int = map[MetricScope]int{
|
||||
MetricScopeNode: 10,
|
||||
MetricScopeSocket: 5,
|
||||
MetricScopeMemoryDomain: 3,
|
||||
MetricScopeCore: 2,
|
||||
MetricScopeHWThread: 1,
|
||||
// var metricScopeGranularity map[MetricScope]int = map[MetricScope]int{
|
||||
// MetricScopeNode: 10,
|
||||
// MetricScopeSocket: 5,
|
||||
// MetricScopeMemoryDomain: 3,
|
||||
// MetricScopeCore: 2,
|
||||
// MetricScopeHWThread: 1,
|
||||
|
||||
MetricScopeAccelerator: 5, // Special/Randomly choosen
|
||||
// MetricScopeAccelerator: 5, // Special/Randomly choosen
|
||||
|
||||
MetricScopeInvalid: -1,
|
||||
}
|
||||
|
||||
func (e *MetricScope) LT(other MetricScope) bool {
|
||||
a := metricScopeGranularity[*e]
|
||||
b := metricScopeGranularity[other]
|
||||
return a < b
|
||||
}
|
||||
|
||||
func (e *MetricScope) LTE(other MetricScope) bool {
|
||||
a := metricScopeGranularity[*e]
|
||||
b := metricScopeGranularity[other]
|
||||
return a <= b
|
||||
}
|
||||
|
||||
func (e *MetricScope) Max(other MetricScope) MetricScope {
|
||||
a := metricScopeGranularity[*e]
|
||||
b := metricScopeGranularity[other]
|
||||
if a > b {
|
||||
return *e
|
||||
}
|
||||
return other
|
||||
}
|
||||
|
||||
func (e *MetricScope) UnmarshalGQL(v interface{}) error {
|
||||
str, ok := v.(string)
|
||||
if !ok {
|
||||
return fmt.Errorf("enums must be strings")
|
||||
}
|
||||
|
||||
*e = MetricScope(str)
|
||||
if !e.Valid() {
|
||||
return fmt.Errorf("%s is not a valid MetricScope", str)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (e MetricScope) MarshalGQL(w io.Writer) {
|
||||
fmt.Fprintf(w, "\"%s\"", e)
|
||||
}
|
||||
|
||||
func (e MetricScope) Valid() bool {
|
||||
gran, ok := metricScopeGranularity[e]
|
||||
return ok && gran > 0
|
||||
}
|
||||
|
||||
func (jd *JobData) Size() int {
|
||||
n := 128
|
||||
for _, scopes := range *jd {
|
||||
for _, metric := range scopes {
|
||||
if metric.StatisticsSeries != nil {
|
||||
n += len(metric.StatisticsSeries.Max)
|
||||
n += len(metric.StatisticsSeries.Mean)
|
||||
n += len(metric.StatisticsSeries.Min)
|
||||
}
|
||||
|
||||
for _, series := range metric.Series {
|
||||
n += len(series.Data)
|
||||
}
|
||||
}
|
||||
}
|
||||
return n * int(unsafe.Sizeof(Float(0)))
|
||||
}
|
||||
|
||||
const smooth bool = false
|
||||
|
||||
func (jm *JobMetric) AddStatisticsSeries() {
|
||||
if jm.StatisticsSeries != nil || len(jm.Series) < 4 {
|
||||
return
|
||||
}
|
||||
|
||||
n, m := 0, len(jm.Series[0].Data)
|
||||
for _, series := range jm.Series {
|
||||
if len(series.Data) > n {
|
||||
n = len(series.Data)
|
||||
}
|
||||
if len(series.Data) < m {
|
||||
m = len(series.Data)
|
||||
}
|
||||
}
|
||||
|
||||
min, mean, max := make([]Float, n), make([]Float, n), make([]Float, n)
|
||||
i := 0
|
||||
for ; i < m; i++ {
|
||||
smin, ssum, smax := math.MaxFloat32, 0.0, -math.MaxFloat32
|
||||
notnan := 0
|
||||
for j := 0; j < len(jm.Series); j++ {
|
||||
x := float64(jm.Series[j].Data[i])
|
||||
if math.IsNaN(x) {
|
||||
continue
|
||||
}
|
||||
|
||||
notnan += 1
|
||||
ssum += x
|
||||
smin = math.Min(smin, x)
|
||||
smax = math.Max(smax, x)
|
||||
}
|
||||
|
||||
if notnan < 3 {
|
||||
min[i] = NaN
|
||||
mean[i] = NaN
|
||||
max[i] = NaN
|
||||
} else {
|
||||
min[i] = Float(smin)
|
||||
mean[i] = Float(ssum / float64(notnan))
|
||||
max[i] = Float(smax)
|
||||
}
|
||||
}
|
||||
|
||||
for ; i < n; i++ {
|
||||
min[i] = NaN
|
||||
mean[i] = NaN
|
||||
max[i] = NaN
|
||||
}
|
||||
|
||||
if smooth {
|
||||
for i := 2; i < len(mean)-2; i++ {
|
||||
if min[i].IsNaN() {
|
||||
continue
|
||||
}
|
||||
|
||||
min[i] = (min[i-2] + min[i-1] + min[i] + min[i+1] + min[i+2]) / 5
|
||||
max[i] = (max[i-2] + max[i-1] + max[i] + max[i+1] + max[i+2]) / 5
|
||||
mean[i] = (mean[i-2] + mean[i-1] + mean[i] + mean[i+1] + mean[i+2]) / 5
|
||||
}
|
||||
}
|
||||
|
||||
jm.StatisticsSeries = &StatsSeries{Mean: mean, Min: min, Max: max}
|
||||
}
|
||||
|
||||
func (jd *JobData) AddNodeScope(metric string) bool {
|
||||
scopes, ok := (*jd)[metric]
|
||||
if !ok {
|
||||
return false
|
||||
}
|
||||
|
||||
var maxScope MetricScope = MetricScopeInvalid
|
||||
for scope := range scopes {
|
||||
maxScope = maxScope.Max(scope)
|
||||
}
|
||||
|
||||
if maxScope == MetricScopeInvalid || maxScope == MetricScopeNode {
|
||||
return false
|
||||
}
|
||||
|
||||
jm := scopes[maxScope]
|
||||
hosts := make(map[string][]Series, 32)
|
||||
for _, series := range jm.Series {
|
||||
hosts[series.Hostname] = append(hosts[series.Hostname], series)
|
||||
}
|
||||
|
||||
nodeJm := &JobMetric{
|
||||
Unit: jm.Unit,
|
||||
Scope: MetricScopeNode,
|
||||
Timestep: jm.Timestep,
|
||||
Series: make([]Series, 0, len(hosts)),
|
||||
}
|
||||
for hostname, series := range hosts {
|
||||
min, sum, max := math.MaxFloat32, 0.0, -math.MaxFloat32
|
||||
for _, series := range series {
|
||||
if series.Statistics == nil {
|
||||
min, sum, max = math.NaN(), math.NaN(), math.NaN()
|
||||
break
|
||||
}
|
||||
sum += series.Statistics.Avg
|
||||
min = math.Min(min, series.Statistics.Min)
|
||||
max = math.Max(max, series.Statistics.Max)
|
||||
}
|
||||
|
||||
n, m := 0, len(jm.Series[0].Data)
|
||||
for _, series := range jm.Series {
|
||||
if len(series.Data) > n {
|
||||
n = len(series.Data)
|
||||
}
|
||||
if len(series.Data) < m {
|
||||
m = len(series.Data)
|
||||
}
|
||||
}
|
||||
|
||||
i, data := 0, make([]Float, len(series[0].Data))
|
||||
for ; i < m; i++ {
|
||||
x := Float(0.0)
|
||||
for _, series := range jm.Series {
|
||||
x += series.Data[i]
|
||||
}
|
||||
data[i] = x
|
||||
}
|
||||
|
||||
for ; i < n; i++ {
|
||||
data[i] = NaN
|
||||
}
|
||||
|
||||
nodeJm.Series = append(nodeJm.Series, Series{
|
||||
Hostname: hostname,
|
||||
Statistics: &MetricStatistics{Min: min, Avg: sum / float64(len(series)), Max: max},
|
||||
Data: data,
|
||||
})
|
||||
}
|
||||
|
||||
scopes[MetricScopeNode] = nodeJm
|
||||
return true
|
||||
}
|
||||
|
||||
func (jm *JobMetric) AddPercentiles(ps []int) bool {
|
||||
if jm.StatisticsSeries == nil {
|
||||
jm.AddStatisticsSeries()
|
||||
}
|
||||
|
||||
if len(jm.Series) < 3 {
|
||||
return false
|
||||
}
|
||||
|
||||
if jm.StatisticsSeries.Percentiles == nil {
|
||||
jm.StatisticsSeries.Percentiles = make(map[int][]Float, len(ps))
|
||||
}
|
||||
|
||||
n := 0
|
||||
for _, series := range jm.Series {
|
||||
if len(series.Data) > n {
|
||||
n = len(series.Data)
|
||||
}
|
||||
}
|
||||
|
||||
data := make([][]float64, n)
|
||||
for i := 0; i < n; i++ {
|
||||
vals := make([]float64, 0, len(jm.Series))
|
||||
for _, series := range jm.Series {
|
||||
if i < len(series.Data) {
|
||||
vals = append(vals, float64(series.Data[i]))
|
||||
}
|
||||
}
|
||||
|
||||
sort.Float64s(vals)
|
||||
data[i] = vals
|
||||
}
|
||||
|
||||
for _, p := range ps {
|
||||
if p < 1 || p > 99 {
|
||||
panic("invalid percentile")
|
||||
}
|
||||
|
||||
if _, ok := jm.StatisticsSeries.Percentiles[p]; ok {
|
||||
continue
|
||||
}
|
||||
|
||||
percentiles := make([]Float, n)
|
||||
for i := 0; i < n; i++ {
|
||||
sorted := data[i]
|
||||
percentiles[i] = Float(sorted[(len(sorted)*p)/100])
|
||||
}
|
||||
|
||||
jm.StatisticsSeries.Percentiles[p] = percentiles
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
// MetricScopeInvalid: -1,
|
||||
// }
|
||||
|
Loading…
Reference in New Issue
Block a user