add scopes, paging and backend filtering to nodeList

This commit is contained in:
Christoph Kluge
2025-01-09 18:56:50 +01:00
parent e871703724
commit 2a3383e9e6
17 changed files with 2300 additions and 565 deletions

View File

@@ -11,6 +11,7 @@ import (
"encoding/json"
"fmt"
"net/http"
"sort"
"strconv"
"strings"
"time"
@@ -44,7 +45,6 @@ type CCMetricStore struct {
type ApiQueryRequest struct {
Cluster string `json:"cluster"`
Queries []ApiQuery `json:"queries"`
NodeQuery NodeQuery `json:"node-query"`
ForAllNodes []string `json:"for-all-nodes"`
From int64 `json:"from"`
To int64 `json:"to"`
@@ -63,19 +63,6 @@ type ApiQuery struct {
Aggregate bool `json:"aggreg"`
}
type NodeQuery struct {
Type *string `json:"type,omitempty"`
SubType *string `json:"subtype,omitempty"`
Metrics []string `json:"metrics"`
NodeFilter string `json:"node-filter"`
Resolution int `json:"resolution"`
TypeIds []string `json:"type-ids,omitempty"`
SubTypeIds []string `json:"subtype-ids,omitempty"`
Aggregate bool `json:"aggreg"`
Page int `json:"page"`
ItemsPerPage int `json:"items-per-page"`
}
type ApiQueryResponse struct {
Queries []ApiQuery `json:"queries,omitempty"`
Results [][]ApiMetricData `json:"results"`
@@ -712,9 +699,13 @@ func (ccms *CCMetricStore) LoadNodeListData(
scopes []schema.MetricScope,
resolution int,
from, to time.Time,
page model.PageRequest,
page *model.PageRequest,
ctx context.Context,
) (map[string]map[string]map[schema.MetricScope]*schema.JobMetric, error) {
) (map[string]map[string]map[schema.MetricScope]*schema.JobMetric, int, bool, error) {
// 0) Init additional vars
var totalNodes int = 0
var hasNextPage bool = false
// 1) Get list of all nodes
var nodes []string
@@ -728,8 +719,6 @@ func (ccms *CCMetricStore) LoadNodeListData(
}
}
log.Debugf(">> SEE HERE: NODES (All)! %v (Len: %d)", nodes, len(nodes))
// 2) Filter nodes
if nodeFilter != "" {
filteredNodes := []string{}
@@ -741,7 +730,9 @@ func (ccms *CCMetricStore) LoadNodeListData(
nodes = filteredNodes
}
log.Debugf(">> SEE HERE: NODES (Filtered)! %v (Len: %d)", nodes, len(nodes))
// 2.1) Count total nodes && Sort nodes -> Sorting invalidated after ccms return ...
totalNodes = len(nodes)
sort.Strings(nodes)
// 3) Apply paging
if len(nodes) > page.ItemsPerPage {
@@ -749,16 +740,17 @@ func (ccms *CCMetricStore) LoadNodeListData(
end := start + page.ItemsPerPage
if end > len(nodes) {
end = len(nodes)
hasNextPage = false
} else {
hasNextPage = true
}
nodes = nodes[start:end]
}
log.Debugf(">> SEE HERE: NODES (Paged)! %v (Len: %d)", nodes, len(nodes))
queries, assignedScope, err := ccms.buildNodeQueries(cluster, subCluster, nodes, metrics, scopes, resolution)
if err != nil {
log.Warn("Error while building queries")
return nil, err
return nil, totalNodes, hasNextPage, err
}
req := ApiQueryRequest{
@@ -773,7 +765,7 @@ func (ccms *CCMetricStore) LoadNodeListData(
resBody, err := ccms.doRequest(ctx, &req)
if err != nil {
log.Error(fmt.Sprintf("Error while performing request %#v\n", err))
return nil, err
return nil, totalNodes, hasNextPage, err
}
var errors []string
@@ -795,14 +787,27 @@ func (ccms *CCMetricStore) LoadNodeListData(
res = mc.Timestep
}
nodeMetric, ok := data[query.Hostname][metric][scope]
// Init Nested Map Data Structures If Not Found
hostData, ok := data[query.Hostname]
if !ok {
nodeMetric = &schema.JobMetric{
hostData = make(map[string]map[schema.MetricScope]*schema.JobMetric)
data[query.Hostname] = hostData
}
metricData, ok := hostData[metric]
if !ok {
metricData = make(map[schema.MetricScope]*schema.JobMetric)
data[query.Hostname][metric] = metricData
}
scopeData, ok := metricData[scope]
if !ok {
scopeData = &schema.JobMetric{
Unit: mc.Unit,
Timestep: res,
Series: make([]schema.Series, 0),
}
data[query.Hostname][metric][scope] = nodeMetric
data[query.Hostname][metric][scope] = scopeData
}
for ndx, res := range row {
@@ -825,7 +830,7 @@ func (ccms *CCMetricStore) LoadNodeListData(
res.Max = schema.Float(0)
}
nodeMetric.Series = append(nodeMetric.Series, schema.Series{
scopeData.Series = append(scopeData.Series, schema.Series{
Hostname: query.Hostname,
Id: id,
Statistics: schema.MetricStatistics{
@@ -840,12 +845,10 @@ func (ccms *CCMetricStore) LoadNodeListData(
if len(errors) != 0 {
/* Returns list of "partial errors" */
return data, fmt.Errorf("METRICDATA/CCMS > Errors: %s", strings.Join(errors, ", "))
return data, totalNodes, hasNextPage, fmt.Errorf("METRICDATA/CCMS > Errors: %s", strings.Join(errors, ", "))
}
log.Debugf(">> SEE HERE: DATA (Final)! %v (Len: %d)", data, len(data))
return data, nil
return data, totalNodes, hasNextPage, nil
}
func (ccms *CCMetricStore) buildNodeQueries(

View File

@@ -320,12 +320,14 @@ func (idb *InfluxDBv2DataRepository) LoadNodeListData(
scopes []schema.MetricScope,
resolution int,
from, to time.Time,
page model.PageRequest,
page *model.PageRequest,
ctx context.Context,
) (map[string]map[string]map[schema.MetricScope]*schema.JobMetric, error) {
) (map[string]map[string]map[schema.MetricScope]*schema.JobMetric, int, bool, error) {
var totalNodes int = 0
var hasNextPage bool = false
// TODO : Implement to be used in NodeList-View
log.Infof("LoadNodeListData unimplemented for InfluxDBv2DataRepository, Args: cluster %s, metrics %v, nodeFilter %v, scopes %v", cluster, metrics, nodeFilter, scopes)
return nil, errors.New("METRICDATA/INFLUXV2 > unimplemented for InfluxDBv2DataRepository")
return nil, totalNodes, hasNextPage, errors.New("METRICDATA/INFLUXV2 > unimplemented for InfluxDBv2DataRepository")
}

View File

@@ -31,7 +31,7 @@ type MetricDataRepository interface {
LoadNodeData(cluster string, metrics, nodes []string, scopes []schema.MetricScope, from, to time.Time, ctx context.Context) (map[string]map[string][]*schema.JobMetric, error)
// Return a map of hosts to a map of metrics to a map of scopes for multiple nodes.
LoadNodeListData(cluster, subCluster, nodeFilter string, metrics []string, scopes []schema.MetricScope, resolution int, from, to time.Time, page model.PageRequest, ctx context.Context) (map[string]map[string]map[schema.MetricScope]*schema.JobMetric, error)
LoadNodeListData(cluster, subCluster, nodeFilter string, metrics []string, scopes []schema.MetricScope, resolution int, from, to time.Time, page *model.PageRequest, ctx context.Context) (map[string]map[string]map[schema.MetricScope]*schema.JobMetric, int, bool, error)
}
var metricDataRepos map[string]MetricDataRepository = map[string]MetricDataRepository{}

View File

@@ -454,12 +454,14 @@ func (pdb *PrometheusDataRepository) LoadNodeListData(
scopes []schema.MetricScope,
resolution int,
from, to time.Time,
page model.PageRequest,
page *model.PageRequest,
ctx context.Context,
) (map[string]map[string]map[schema.MetricScope]*schema.JobMetric, error) {
) (map[string]map[string]map[schema.MetricScope]*schema.JobMetric, int, bool, error) {
var totalNodes int = 0
var hasNextPage bool = false
// TODO : Implement to be used in NodeList-View
log.Infof("LoadNodeListData unimplemented for PrometheusDataRepository, Args: cluster %s, metrics %v, nodeFilter %v, scopes %v", cluster, metrics, nodeFilter, scopes)
return nil, errors.New("METRICDATA/INFLUXV2 > unimplemented for PrometheusDataRepository")
return nil, totalNodes, hasNextPage, errors.New("METRICDATA/INFLUXV2 > unimplemented for PrometheusDataRepository")
}

View File

@@ -57,9 +57,9 @@ func (tmdr *TestMetricDataRepository) LoadNodeListData(
scopes []schema.MetricScope,
resolution int,
from, to time.Time,
page model.PageRequest,
page *model.PageRequest,
ctx context.Context,
) (map[string]map[string]map[schema.MetricScope]*schema.JobMetric, error) {
) (map[string]map[string]map[schema.MetricScope]*schema.JobMetric, int, bool, error) {
panic("TODO")
}