initial branch commit, improve countstate backend logic

- stacked component rough sketch
- gql data request pipeline layed out
This commit is contained in:
Christoph Kluge
2025-10-17 18:24:05 +02:00
parent 6efd6334bb
commit 714d6af7cd
12 changed files with 1267 additions and 55 deletions

View File

@@ -117,9 +117,13 @@ func (r *NodeRepository) FetchMetadata(hostname string, cluster string) (map[str
func (r *NodeRepository) GetNode(hostname string, cluster string, withMeta bool) (*schema.Node, error) {
node := &schema.Node{}
if err := sq.Select("node.hostname", "node.cluster", "node.subcluster", "node_state.node_state",
"node_state.health_state", "MAX(node_state.time_stamp)").From("node_state").
Join("node ON nodes_state.node_id = node.id").GroupBy("node_state.node_id").
Where("node.hostname = ?", hostname).Where("node.cluster = ?", cluster).RunWith(r.DB).
"node_state.health_state", "MAX(node_state.time_stamp) as time").
From("node_state").
Join("node ON nodes_state.node_id = node.id").
Where("node.hostname = ?", hostname).
Where("node.cluster = ?", cluster).
GroupBy("node_state.node_id").
RunWith(r.DB).
QueryRow().Scan(&node.Hostname, &node.Cluster, &node.SubCluster, &node.NodeState, &node.HealthState); err != nil {
cclog.Warnf("Error while querying node '%s' from database: %v", hostname, err)
return nil, err
@@ -138,6 +142,34 @@ func (r *NodeRepository) GetNode(hostname string, cluster string, withMeta bool)
return node, nil
}
func (r *NodeRepository) GetNodeById(id int64, withMeta bool) (*schema.Node, error) {
node := &schema.Node{}
if err := sq.Select("node.hostname", "node.cluster", "node.subcluster", "node_state.node_state",
"node_state.health_state", "MAX(node_state.time_stamp) as time").
From("node_state").
Join("node ON nodes_state.node_id = node.id").
Where("node.id = ?", id).
GroupBy("node_state.node_id").
RunWith(r.DB).
QueryRow().Scan(&node.Hostname, &node.Cluster, &node.SubCluster, &node.NodeState, &node.HealthState); err != nil {
cclog.Warnf("Error while querying node ID '%d' from database: %v", id, err)
return nil, err
}
// NEEDS METADATA BY ID
// if withMeta {
// var err error
// var meta map[string]string
// if meta, err = r.FetchMetadata(hostname, cluster); err != nil {
// cclog.Warnf("Error while fetching metadata for node '%s'", hostname)
// return nil, err
// }
// node.MetaData = meta
// }
return node, nil
}
// const NamedNodeInsert string = `
// INSERT INTO node (time_stamp, hostname, cluster, subcluster, node_state, health_state,
//
@@ -244,8 +276,9 @@ func (r *NodeRepository) QueryNodes(
) ([]*schema.Node, error) {
query, qerr := AccessCheck(ctx,
sq.Select("node.hostname", "node.cluster", "node.subcluster", "node_state.node_state",
"node_state.health_state", "MAX(node_state.time_stamp)").From("node").
Join("node_state ON nodes_state.node_id = node.id").GroupBy("node_state.node_id"))
"node_state.health_state", "MAX(node_state.time_stamp) as time").
From("node").
Join("node_state ON nodes_state.node_id = node.id"))
if qerr != nil {
return nil, qerr
}
@@ -268,6 +301,9 @@ func (r *NodeRepository) QueryNodes(
}
}
// Add Grouping after filters
query = query.GroupBy("node_state.node_id")
rows, err := query.RunWith(r.stmtCache).Query()
if err != nil {
queryString, queryVars, _ := query.ToSql()
@@ -293,9 +329,12 @@ func (r *NodeRepository) QueryNodes(
func (r *NodeRepository) ListNodes(cluster string) ([]*schema.Node, error) {
q := sq.Select("node.hostname", "node.cluster", "node.subcluster", "node_state.node_state",
"node_state.health_state", "MAX(node_state.time_stamp)").From("node").
Join("node_state ON node_state.node_id = node.id").GroupBy("node_state.node_id").
Where("node.cluster = ?", cluster).OrderBy("node.hostname ASC")
"node_state.health_state", "MAX(node_state.time_stamp) as time").
From("node").
Join("node_state ON node_state.node_id = node.id").
Where("node.cluster = ?", cluster).
GroupBy("node_state.node_id").
OrderBy("node.hostname ASC")
rows, err := q.RunWith(r.DB).Query()
if err != nil {
@@ -319,35 +358,33 @@ func (r *NodeRepository) ListNodes(cluster string) ([]*schema.Node, error) {
}
func (r *NodeRepository) CountNodeStates(ctx context.Context, filters []*model.NodeFilter) ([]*model.NodeStates, error) {
query, qerr := AccessCheck(ctx, sq.Select("node_state", "count(*) AS count").From("node"))
query, qerr := AccessCheck(ctx, sq.Select("hostname", "node_state", "MAX(time_stamp) as time").From("node"))
if qerr != nil {
return nil, qerr
}
// Get latest Info aka closest Timestamp to $now
now := time.Now().Unix()
query = query.Join("node_state ON node_state.node_id = node.id").Where(sq.Gt{"node_state.time_stamp": (now - 60)}) // .Distinct()
query = query.Join("node_state ON node_state.node_id = node.id")
for _, f := range filters {
if f.Hostname != nil {
query = buildStringCondition("node.hostname", f.Hostname, query)
query = buildStringCondition("hostname", f.Hostname, query)
}
if f.Cluster != nil {
query = buildStringCondition("node.cluster", f.Cluster, query)
query = buildStringCondition("cluster", f.Cluster, query)
}
if f.Subcluster != nil {
query = buildStringCondition("node.subcluster", f.Subcluster, query)
query = buildStringCondition("subcluster", f.Subcluster, query)
}
if f.SchedulerState != nil {
query = query.Where("node.node_state = ?", f.SchedulerState)
query = query.Where("node_state = ?", f.SchedulerState)
}
if f.HealthState != nil {
query = query.Where("node.health_state = ?", f.HealthState)
query = query.Where("health_state = ?", f.HealthState)
}
}
// Add Group and Order
query = query.GroupBy("node_state").OrderBy("count DESC")
query = query.GroupBy("hostname").OrderBy("hostname DESC")
rows, err := query.RunWith(r.stmtCache).Query()
if err != nil {
@@ -356,15 +393,23 @@ func (r *NodeRepository) CountNodeStates(ctx context.Context, filters []*model.N
return nil, err
}
nodes := make([]*model.NodeStates, 0)
stateMap := map[string]int{}
for rows.Next() {
node := model.NodeStates{}
var hostname, node_state string
var timestamp int64
if err := rows.Scan(&node.State, &node.Count); err != nil {
if err := rows.Scan(&hostname, &node_state, &timestamp); err != nil {
rows.Close()
cclog.Warn("Error while scanning rows (NodeStates)")
return nil, err
}
stateMap[node_state] += 1
}
nodes := make([]*model.NodeStates, 0)
for state, counts := range stateMap {
node := model.NodeStates{State: state, Count: counts}
nodes = append(nodes, &node)
}
@@ -372,35 +417,33 @@ func (r *NodeRepository) CountNodeStates(ctx context.Context, filters []*model.N
}
func (r *NodeRepository) CountHealthStates(ctx context.Context, filters []*model.NodeFilter) ([]*model.NodeStates, error) {
query, qerr := AccessCheck(ctx, sq.Select("health_state", "count(*) AS count").From("node"))
query, qerr := AccessCheck(ctx, sq.Select("hostname", "health_state", "MAX(time_stamp) as time").From("node"))
if qerr != nil {
return nil, qerr
}
// Get latest Info aka closest Timestamp to $now
now := time.Now().Unix()
query = query.Join("node_state ON node_state.node_id = node.id").Where(sq.Gt{"node_state.time_stamp": (now - 60)}) // .Distinct()
query = query.Join("node_state ON node_state.node_id = node.id")
for _, f := range filters {
if f.Hostname != nil {
query = buildStringCondition("node.hostname", f.Hostname, query)
query = buildStringCondition("hostname", f.Hostname, query)
}
if f.Cluster != nil {
query = buildStringCondition("node.cluster", f.Cluster, query)
query = buildStringCondition("cluster", f.Cluster, query)
}
if f.Subcluster != nil {
query = buildStringCondition("node.subcluster", f.Subcluster, query)
query = buildStringCondition("subcluster", f.Subcluster, query)
}
if f.SchedulerState != nil {
query = query.Where("node.node_state = ?", f.SchedulerState)
query = query.Where("node_state = ?", f.SchedulerState)
}
if f.HealthState != nil {
query = query.Where("node.health_state = ?", f.HealthState)
query = query.Where("health_state = ?", f.HealthState)
}
}
// Add Group and Order
query = query.GroupBy("health_state").OrderBy("count DESC")
query = query.GroupBy("hostname").OrderBy("hostname DESC")
rows, err := query.RunWith(r.stmtCache).Query()
if err != nil {
@@ -409,15 +452,23 @@ func (r *NodeRepository) CountHealthStates(ctx context.Context, filters []*model
return nil, err
}
nodes := make([]*model.NodeStates, 0)
stateMap := map[string]int{}
for rows.Next() {
node := model.NodeStates{}
var hostname, health_state string
var timestamp int64
if err := rows.Scan(&node.State, &node.Count); err != nil {
if err := rows.Scan(&hostname, &health_state, &timestamp); err != nil {
rows.Close()
cclog.Warn("Error while scanning rows (NodeStates)")
return nil, err
}
stateMap[health_state] += 1
}
nodes := make([]*model.NodeStates, 0)
for state, counts := range stateMap {
node := model.NodeStates{State: state, Count: counts}
nodes = append(nodes, &node)
}