mirror of
https://github.com/ClusterCockpit/cc-backend
synced 2025-11-01 00:15:05 +01:00
Sync commit
Does not work yet
This commit is contained in:
@@ -1,5 +1,3 @@
|
||||
-- sqlfluff:dialect:sqlite
|
||||
--
|
||||
CREATE TABLE "node" (
|
||||
id INTEGER PRIMARY KEY,
|
||||
hostname VARCHAR(255) NOT NULL,
|
||||
@@ -13,9 +11,6 @@ CREATE TABLE "node_state" (
|
||||
id INTEGER PRIMARY KEY,
|
||||
time_stamp INTEGER NOT NULL,
|
||||
jobs_running INTEGER DEFAULT 0 NOT NULL,
|
||||
cpus_total INTEGER DEFAULT 0 NOT NULL,
|
||||
memory_total INTEGER DEFAULT 0 NOT NULL,
|
||||
gpus_total INTEGER DEFAULT 0 NOT NULL,
|
||||
cpus_allocated INTEGER DEFAULT 0 NOT NULL,
|
||||
memory_allocated INTEGER DEFAULT 0 NOT NULL,
|
||||
gpus_allocated INTEGER DEFAULT 0 NOT NULL,
|
||||
@@ -35,10 +30,6 @@ CREATE TABLE "node_state" (
|
||||
-- Add Indices For New Node Table VARCHAR Fields
|
||||
CREATE INDEX IF NOT EXISTS nodes_cluster ON node (cluster);
|
||||
CREATE INDEX IF NOT EXISTS nodes_cluster_subcluster ON node (cluster, subcluster);
|
||||
CREATE INDEX IF NOT EXISTS nodes_state ON node (node_state);
|
||||
CREATE INDEX IF NOT EXISTS nodes_cluster_state ON node (cluster, node_state);
|
||||
CREATE INDEX IF NOT EXISTS nodes_health ON node (health_state);
|
||||
CREATE INDEX IF NOT EXISTS nodes_cluster_health ON node (cluster, health_state);
|
||||
|
||||
-- Add Indices For Increased Amounts of Tags
|
||||
CREATE INDEX IF NOT EXISTS tags_jobid ON jobtag (job_id);
|
||||
|
||||
@@ -2,6 +2,7 @@
|
||||
// All rights reserved. This file is part of cc-backend.
|
||||
// Use of this source code is governed by a MIT-style
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
package repository
|
||||
|
||||
import (
|
||||
@@ -49,12 +50,6 @@ func GetNodeRepository() *NodeRepository {
|
||||
return nodeRepoInstance
|
||||
}
|
||||
|
||||
var nodeColumns []string = []string{
|
||||
// "node.id,"
|
||||
"node.hostname", "node.cluster", "node.subcluster",
|
||||
"node.node_state", "node.health_state", // "node.meta_data",
|
||||
}
|
||||
|
||||
func (r *NodeRepository) FetchMetadata(node *schema.Node) (map[string]string, error) {
|
||||
start := time.Now()
|
||||
cachekey := fmt.Sprintf("metadata:%d", node.ID)
|
||||
@@ -121,24 +116,24 @@ func (r *NodeRepository) UpdateMetadata(node *schema.Node, key, val string) (err
|
||||
|
||||
func (r *NodeRepository) GetNode(id int64, withMeta bool) (*schema.Node, error) {
|
||||
node := &schema.Node{}
|
||||
if err := sq.Select("id", "hostname", "cluster", "subcluster", "node_state",
|
||||
"health_state").From("node").
|
||||
Where("node.id = ?", id).RunWith(r.DB).
|
||||
QueryRow().Scan(&node.ID, &node.Hostname, &node.Cluster, &node.SubCluster, &node.NodeState,
|
||||
&node.HealthState); err != nil {
|
||||
cclog.Warnf("Error while querying node '%v' from database", id)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if withMeta {
|
||||
var err error
|
||||
var meta map[string]string
|
||||
if meta, err = r.FetchMetadata(node); err != nil {
|
||||
cclog.Warnf("Error while fetching metadata for node '%v'", id)
|
||||
return nil, err
|
||||
}
|
||||
node.MetaData = meta
|
||||
}
|
||||
// if err := sq.Select("id", "hostname", "cluster", "subcluster", "node_state",
|
||||
// "health_state").From("node").
|
||||
// Where("node.id = ?", id).RunWith(r.DB).
|
||||
// QueryRow().Scan(&node.ID, &node.Hostname, &node.Cluster, &node.SubCluster, &node.NodeState,
|
||||
// &node.HealthState); err != nil {
|
||||
// cclog.Warnf("Error while querying node '%v' from database", id)
|
||||
// return nil, err
|
||||
// }
|
||||
//
|
||||
// if withMeta {
|
||||
// var err error
|
||||
// var meta map[string]string
|
||||
// if meta, err = r.FetchMetadata(node); err != nil {
|
||||
// cclog.Warnf("Error while fetching metadata for node '%v'", id)
|
||||
// return nil, err
|
||||
// }
|
||||
// node.MetaData = meta
|
||||
// }
|
||||
|
||||
return node, nil
|
||||
}
|
||||
@@ -192,8 +187,9 @@ func (r *NodeRepository) InsertNodeState(nodeState *schema.Node) error {
|
||||
}
|
||||
|
||||
const NamedNodeStateInsert string = `
|
||||
INSERT INTO node (hostname, cluster, subcluster)
|
||||
VALUES (:hostname, :cluster, :subcluster);`
|
||||
INSERT INTO node_state (time_stamp, node_state, health_state, cpus_allocated,
|
||||
memory_allocated, gpus_allocated, jobs_running, node_id)
|
||||
VALUES (:time_stamp, :node_state, :health_state, :cpus_allocated, :memory_allocated, :gpus_allocated, :jobs_running, :node_id);`
|
||||
|
||||
func (r *NodeRepository) UpdateNodeState(hostname string, cluster string, nodeState *schema.NodeState) error {
|
||||
var id int64
|
||||
@@ -208,8 +204,7 @@ func (r *NodeRepository) UpdateNodeState(hostname string, cluster string, nodeSt
|
||||
return err
|
||||
}
|
||||
node := schema.Node{
|
||||
Hostname: hostname, Cluster: cluster, SubCluster: subcluster, NodeState: *nodeState,
|
||||
HealthState: schema.MonitoringStateFull,
|
||||
Hostname: hostname, Cluster: cluster, SubCluster: subcluster,
|
||||
}
|
||||
id, err = r.AddNode(&node)
|
||||
if err != nil {
|
||||
@@ -225,11 +220,12 @@ func (r *NodeRepository) UpdateNodeState(hostname string, cluster string, nodeSt
|
||||
}
|
||||
}
|
||||
|
||||
if _, err := sq.Insert("node_state").Set("node_state", nodeState).Where("node.id = ?", id).RunWith(r.DB).Exec(); err != nil {
|
||||
cclog.Errorf("error while updating node '%s'", hostname)
|
||||
_, err := r.DB.NamedExec(NamedNodeStateInsert, nodeState)
|
||||
if err != nil {
|
||||
cclog.Errorf("Error while adding node state for '%v' to database", hostname)
|
||||
return err
|
||||
}
|
||||
cclog.Infof("Updated node '%s' in database", hostname)
|
||||
cclog.Infof("Updated node state for '%s' in database", hostname)
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -252,13 +248,15 @@ func (r *NodeRepository) DeleteNode(id int64) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// TODO: Implement order by
|
||||
// QueryNodes returns a list of nodes based on a node filter. It always operates
|
||||
// on the last state (largest timestamp).
|
||||
func (r *NodeRepository) QueryNodes(
|
||||
ctx context.Context,
|
||||
filters []*model.NodeFilter,
|
||||
order *model.OrderByInput, // Currently unused!
|
||||
) ([]*schema.Node, error) {
|
||||
query, qerr := AccessCheck(ctx, sq.Select(nodeColumns...).From("node"))
|
||||
) ([]*model.Node, error) {
|
||||
query, qerr := AccessCheck(ctx,
|
||||
sq.Select("node.hostname", "node.cluster", "node.subcluster", "node_state.node_state", "node_state.health_state", "MAX(node_state.time_stamp)").From("node_state").Join("node ON nodes_state.node_id = node.id").GroupBy("node_state.node_id"))
|
||||
if qerr != nil {
|
||||
return nil, qerr
|
||||
}
|
||||
@@ -288,9 +286,9 @@ func (r *NodeRepository) QueryNodes(
|
||||
return nil, err
|
||||
}
|
||||
|
||||
nodes := make([]*schema.Node, 0, 50)
|
||||
nodes := make([]*model.Node, 0, 50)
|
||||
for rows.Next() {
|
||||
node := schema.Node{}
|
||||
node := model.Node{}
|
||||
|
||||
if err := rows.Scan(&node.Hostname, &node.Cluster, &node.SubCluster,
|
||||
&node.NodeState, &node.HealthState); err != nil {
|
||||
@@ -304,128 +302,104 @@ func (r *NodeRepository) QueryNodes(
|
||||
return nodes, nil
|
||||
}
|
||||
|
||||
func (r *NodeRepository) ListNodes(cluster string) ([]*schema.Node, error) {
|
||||
q := sq.Select("hostname", "cluster", "subcluster", "node_state",
|
||||
"health_state").From("node").Where("node.cluster = ?", cluster).OrderBy("node.hostname ASC")
|
||||
|
||||
rows, err := q.RunWith(r.DB).Query()
|
||||
if err != nil {
|
||||
cclog.Warn("Error while querying user list")
|
||||
return nil, err
|
||||
}
|
||||
nodeList := make([]*schema.Node, 0, 100)
|
||||
defer rows.Close()
|
||||
for rows.Next() {
|
||||
node := &schema.Node{}
|
||||
if err := rows.Scan(&node.Hostname, &node.Cluster,
|
||||
&node.SubCluster, &node.NodeState, &node.HealthState); err != nil {
|
||||
cclog.Warn("Error while scanning node list")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
nodeList = append(nodeList, node)
|
||||
}
|
||||
|
||||
return nodeList, nil
|
||||
}
|
||||
|
||||
func (r *NodeRepository) CountNodeStates(ctx context.Context, filters []*model.NodeFilter) ([]*model.NodeStates, error) {
|
||||
query, qerr := AccessCheck(ctx, sq.Select("node_state AS state", "count(*) AS count").From("node"))
|
||||
if qerr != nil {
|
||||
return nil, qerr
|
||||
}
|
||||
|
||||
for _, f := range filters {
|
||||
if f.Hostname != nil {
|
||||
query = buildStringCondition("node.hostname", f.Hostname, query)
|
||||
}
|
||||
if f.Cluster != nil {
|
||||
query = buildStringCondition("node.cluster", f.Cluster, query)
|
||||
}
|
||||
if f.Subcluster != nil {
|
||||
query = buildStringCondition("node.subcluster", f.Subcluster, query)
|
||||
}
|
||||
if f.NodeState != nil {
|
||||
query = query.Where("node.node_state = ?", f.NodeState)
|
||||
}
|
||||
if f.HealthState != nil {
|
||||
query = query.Where("node.health_state = ?", f.HealthState)
|
||||
}
|
||||
}
|
||||
|
||||
// Add Group and Order
|
||||
query = query.GroupBy("state").OrderBy("count DESC")
|
||||
|
||||
rows, err := query.RunWith(r.stmtCache).Query()
|
||||
if err != nil {
|
||||
queryString, queryVars, _ := query.ToSql()
|
||||
cclog.Errorf("Error while running query '%s' %v: %v", queryString, queryVars, err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
nodes := make([]*model.NodeStates, 0)
|
||||
for rows.Next() {
|
||||
node := model.NodeStates{}
|
||||
|
||||
if err := rows.Scan(&node.State, &node.Count); err != nil {
|
||||
rows.Close()
|
||||
cclog.Warn("Error while scanning rows (NodeStates)")
|
||||
return nil, err
|
||||
}
|
||||
nodes = append(nodes, &node)
|
||||
}
|
||||
|
||||
return nodes, nil
|
||||
}
|
||||
|
||||
func (r *NodeRepository) CountHealthStates(ctx context.Context, filters []*model.NodeFilter) ([]*model.NodeStates, error) {
|
||||
query, qerr := AccessCheck(ctx, sq.Select("health_state AS state", "count(*) AS count").From("node"))
|
||||
if qerr != nil {
|
||||
return nil, qerr
|
||||
}
|
||||
|
||||
for _, f := range filters {
|
||||
if f.Hostname != nil {
|
||||
query = buildStringCondition("node.hostname", f.Hostname, query)
|
||||
}
|
||||
if f.Cluster != nil {
|
||||
query = buildStringCondition("node.cluster", f.Cluster, query)
|
||||
}
|
||||
if f.Subcluster != nil {
|
||||
query = buildStringCondition("node.subcluster", f.Subcluster, query)
|
||||
}
|
||||
if f.NodeState != nil {
|
||||
query = query.Where("node.node_state = ?", f.NodeState)
|
||||
}
|
||||
if f.HealthState != nil {
|
||||
query = query.Where("node.health_state = ?", f.HealthState)
|
||||
}
|
||||
}
|
||||
|
||||
// Add Group and Order
|
||||
query = query.GroupBy("state").OrderBy("count DESC")
|
||||
|
||||
rows, err := query.RunWith(r.stmtCache).Query()
|
||||
if err != nil {
|
||||
queryString, queryVars, _ := query.ToSql()
|
||||
cclog.Errorf("Error while running query '%s' %v: %v", queryString, queryVars, err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
nodes := make([]*model.NodeStates, 0)
|
||||
for rows.Next() {
|
||||
node := model.NodeStates{}
|
||||
|
||||
if err := rows.Scan(&node.State, &node.Count); err != nil {
|
||||
rows.Close()
|
||||
cclog.Warn("Error while scanning rows (NodeStates)")
|
||||
return nil, err
|
||||
}
|
||||
nodes = append(nodes, &node)
|
||||
}
|
||||
|
||||
return nodes, nil
|
||||
}
|
||||
//
|
||||
// func (r *NodeRepository) CountNodeStates(ctx context.Context, filters []*model.NodeFilter) ([]*model.NodeStates, error) {
|
||||
// query, qerr := AccessCheck(ctx, sq.Select("node_state AS state", "count(*) AS count").From("node"))
|
||||
// if qerr != nil {
|
||||
// return nil, qerr
|
||||
// }
|
||||
//
|
||||
// for _, f := range filters {
|
||||
// if f.Hostname != nil {
|
||||
// query = buildStringCondition("node.hostname", f.Hostname, query)
|
||||
// }
|
||||
// if f.Cluster != nil {
|
||||
// query = buildStringCondition("node.cluster", f.Cluster, query)
|
||||
// }
|
||||
// if f.Subcluster != nil {
|
||||
// query = buildStringCondition("node.subcluster", f.Subcluster, query)
|
||||
// }
|
||||
// if f.NodeState != nil {
|
||||
// query = query.Where("node.node_state = ?", f.NodeState)
|
||||
// }
|
||||
// if f.HealthState != nil {
|
||||
// query = query.Where("node.health_state = ?", f.HealthState)
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// // Add Group and Order
|
||||
// query = query.GroupBy("state").OrderBy("count DESC")
|
||||
//
|
||||
// rows, err := query.RunWith(r.stmtCache).Query()
|
||||
// if err != nil {
|
||||
// queryString, queryVars, _ := query.ToSql()
|
||||
// cclog.Errorf("Error while running query '%s' %v: %v", queryString, queryVars, err)
|
||||
// return nil, err
|
||||
// }
|
||||
//
|
||||
// nodes := make([]*model.NodeStates, 0)
|
||||
// for rows.Next() {
|
||||
// node := model.NodeStates{}
|
||||
//
|
||||
// if err := rows.Scan(&node.State, &node.Count); err != nil {
|
||||
// rows.Close()
|
||||
// cclog.Warn("Error while scanning rows (NodeStates)")
|
||||
// return nil, err
|
||||
// }
|
||||
// nodes = append(nodes, &node)
|
||||
// }
|
||||
//
|
||||
// return nodes, nil
|
||||
// }
|
||||
//
|
||||
// func (r *NodeRepository) CountHealthStates(ctx context.Context, filters []*model.NodeFilter) ([]*model.NodeStates, error) {
|
||||
// query, qerr := AccessCheck(ctx, sq.Select("health_state AS state", "count(*) AS count").From("node"))
|
||||
// if qerr != nil {
|
||||
// return nil, qerr
|
||||
// }
|
||||
//
|
||||
// for _, f := range filters {
|
||||
// if f.Hostname != nil {
|
||||
// query = buildStringCondition("node.hostname", f.Hostname, query)
|
||||
// }
|
||||
// if f.Cluster != nil {
|
||||
// query = buildStringCondition("node.cluster", f.Cluster, query)
|
||||
// }
|
||||
// if f.Subcluster != nil {
|
||||
// query = buildStringCondition("node.subcluster", f.Subcluster, query)
|
||||
// }
|
||||
// if f.NodeState != nil {
|
||||
// query = query.Where("node.node_state = ?", f.NodeState)
|
||||
// }
|
||||
// if f.HealthState != nil {
|
||||
// query = query.Where("node.health_state = ?", f.HealthState)
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// // Add Group and Order
|
||||
// query = query.GroupBy("state").OrderBy("count DESC")
|
||||
//
|
||||
// rows, err := query.RunWith(r.stmtCache).Query()
|
||||
// if err != nil {
|
||||
// queryString, queryVars, _ := query.ToSql()
|
||||
// cclog.Errorf("Error while running query '%s' %v: %v", queryString, queryVars, err)
|
||||
// return nil, err
|
||||
// }
|
||||
//
|
||||
// nodes := make([]*model.NodeStates, 0)
|
||||
// for rows.Next() {
|
||||
// node := model.NodeStates{}
|
||||
//
|
||||
// if err := rows.Scan(&node.State, &node.Count); err != nil {
|
||||
// rows.Close()
|
||||
// cclog.Warn("Error while scanning rows (NodeStates)")
|
||||
// return nil, err
|
||||
// }
|
||||
// nodes = append(nodes, &node)
|
||||
// }
|
||||
//
|
||||
// return nodes, nil
|
||||
// }
|
||||
|
||||
func AccessCheck(ctx context.Context, query sq.SelectBuilder) (sq.SelectBuilder, error) {
|
||||
user := GetUserFromContext(ctx)
|
||||
|
||||
Reference in New Issue
Block a user