mirror of
https://github.com/ClusterCockpit/cc-backend
synced 2025-11-01 00:15:05 +01:00
Update Node table code. Add simple unit test
This commit is contained in:
@@ -2,6 +2,7 @@
|
||||
// All rights reserved. This file is part of cc-backend.
|
||||
// Use of this source code is governed by a MIT-style
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
package repository
|
||||
|
||||
import (
|
||||
|
||||
@@ -10,7 +10,6 @@ import (
|
||||
"database/sql"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"maps"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
@@ -50,90 +49,91 @@ func GetNodeRepository() *NodeRepository {
|
||||
return nodeRepoInstance
|
||||
}
|
||||
|
||||
func (r *NodeRepository) FetchMetadata(node *schema.Node) (map[string]string, error) {
|
||||
func (r *NodeRepository) FetchMetadata(hostname string, cluster string) (map[string]string, error) {
|
||||
start := time.Now()
|
||||
cachekey := fmt.Sprintf("metadata:%d", node.ID)
|
||||
if cached := r.cache.Get(cachekey, nil); cached != nil {
|
||||
node.MetaData = cached.(map[string]string)
|
||||
return node.MetaData, nil
|
||||
}
|
||||
|
||||
if err := sq.Select("node.meta_data").From("node").Where("node.id = ?", node.ID).
|
||||
RunWith(r.stmtCache).QueryRow().Scan(&node.RawMetaData); err != nil {
|
||||
RawMetaData := make([]byte, 0)
|
||||
|
||||
if err := sq.Select("node.meta_data").From("node").
|
||||
Where("node.hostname = ?", hostname).
|
||||
Where("node.cluster = ?", cluster).
|
||||
RunWith(r.stmtCache).QueryRow().Scan(&RawMetaData); err != nil {
|
||||
cclog.Warn("Error while scanning for node metadata")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if len(node.RawMetaData) == 0 {
|
||||
if len(RawMetaData) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
if err := json.Unmarshal(node.RawMetaData, &node.MetaData); err != nil {
|
||||
MetaData := make(map[string]string)
|
||||
|
||||
if err := json.Unmarshal(RawMetaData, &MetaData); err != nil {
|
||||
cclog.Warn("Error while unmarshaling raw metadata json")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
r.cache.Put(cachekey, node.MetaData, len(node.RawMetaData), 24*time.Hour)
|
||||
cclog.Debugf("Timer FetchMetadata %s", time.Since(start))
|
||||
return node.MetaData, nil
|
||||
return MetaData, nil
|
||||
}
|
||||
|
||||
func (r *NodeRepository) UpdateMetadata(node *schema.Node, key, val string) (err error) {
|
||||
cachekey := fmt.Sprintf("metadata:%d", node.ID)
|
||||
r.cache.Del(cachekey)
|
||||
if node.MetaData == nil {
|
||||
if _, err = r.FetchMetadata(node); err != nil {
|
||||
cclog.Warnf("Error while fetching metadata for node, DB ID '%v'", node.ID)
|
||||
return err
|
||||
}
|
||||
}
|
||||
//
|
||||
// func (r *NodeRepository) UpdateMetadata(node *schema.Node, key, val string) (err error) {
|
||||
// cachekey := fmt.Sprintf("metadata:%d", node.ID)
|
||||
// r.cache.Del(cachekey)
|
||||
// if node.MetaData == nil {
|
||||
// if _, err = r.FetchMetadata(node); err != nil {
|
||||
// cclog.Warnf("Error while fetching metadata for node, DB ID '%v'", node.ID)
|
||||
// return err
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// if node.MetaData != nil {
|
||||
// cpy := make(map[string]string, len(node.MetaData)+1)
|
||||
// maps.Copy(cpy, node.MetaData)
|
||||
// cpy[key] = val
|
||||
// node.MetaData = cpy
|
||||
// } else {
|
||||
// node.MetaData = map[string]string{key: val}
|
||||
// }
|
||||
//
|
||||
// if node.RawMetaData, err = json.Marshal(node.MetaData); err != nil {
|
||||
// cclog.Warnf("Error while marshaling metadata for node, DB ID '%v'", node.ID)
|
||||
// return err
|
||||
// }
|
||||
//
|
||||
// if _, err = sq.Update("node").
|
||||
// Set("meta_data", node.RawMetaData).
|
||||
// Where("node.id = ?", node.ID).
|
||||
// RunWith(r.stmtCache).Exec(); err != nil {
|
||||
// cclog.Warnf("Error while updating metadata for node, DB ID '%v'", node.ID)
|
||||
// return err
|
||||
// }
|
||||
//
|
||||
// r.cache.Put(cachekey, node.MetaData, len(node.RawMetaData), 24*time.Hour)
|
||||
// return nil
|
||||
// }
|
||||
|
||||
if node.MetaData != nil {
|
||||
cpy := make(map[string]string, len(node.MetaData)+1)
|
||||
maps.Copy(cpy, node.MetaData)
|
||||
cpy[key] = val
|
||||
node.MetaData = cpy
|
||||
} else {
|
||||
node.MetaData = map[string]string{key: val}
|
||||
}
|
||||
|
||||
if node.RawMetaData, err = json.Marshal(node.MetaData); err != nil {
|
||||
cclog.Warnf("Error while marshaling metadata for node, DB ID '%v'", node.ID)
|
||||
return err
|
||||
}
|
||||
|
||||
if _, err = sq.Update("node").
|
||||
Set("meta_data", node.RawMetaData).
|
||||
Where("node.id = ?", node.ID).
|
||||
RunWith(r.stmtCache).Exec(); err != nil {
|
||||
cclog.Warnf("Error while updating metadata for node, DB ID '%v'", node.ID)
|
||||
return err
|
||||
}
|
||||
|
||||
r.cache.Put(cachekey, node.MetaData, len(node.RawMetaData), 24*time.Hour)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *NodeRepository) GetNode(id int64, withMeta bool) (*schema.Node, error) {
|
||||
func (r *NodeRepository) GetNode(hostname string, cluster string, withMeta bool) (*schema.Node, error) {
|
||||
node := &schema.Node{}
|
||||
// if err := sq.Select("id", "hostname", "cluster", "subcluster", "node_state",
|
||||
// "health_state").From("node").
|
||||
// Where("node.id = ?", id).RunWith(r.DB).
|
||||
// QueryRow().Scan(&node.ID, &node.Hostname, &node.Cluster, &node.SubCluster, &node.NodeState,
|
||||
// &node.HealthState); err != nil {
|
||||
// cclog.Warnf("Error while querying node '%v' from database", id)
|
||||
// return nil, err
|
||||
// }
|
||||
//
|
||||
// if withMeta {
|
||||
// var err error
|
||||
// var meta map[string]string
|
||||
// if meta, err = r.FetchMetadata(node); err != nil {
|
||||
// cclog.Warnf("Error while fetching metadata for node '%v'", id)
|
||||
// return nil, err
|
||||
// }
|
||||
// node.MetaData = meta
|
||||
// }
|
||||
if err := sq.Select("node.hostname", "node.cluster", "node.subcluster", "node_state.node_state",
|
||||
"node_state.health_state", "MAX(node_state.time_stamp)").From("node_state").
|
||||
Join("node ON nodes_state.node_id = node.id").GroupBy("node_state.node_id").
|
||||
Where("node.hostname = ?", hostname).Where("node.cluster = ?", cluster).RunWith(r.DB).
|
||||
QueryRow().Scan(&node.Hostname, &node.Cluster, &node.SubCluster, &node.NodeState, &node.HealthState); err != nil {
|
||||
cclog.Warnf("Error while querying node '%s' from database: %v", hostname, err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if withMeta {
|
||||
var err error
|
||||
var meta map[string]string
|
||||
if meta, err = r.FetchMetadata(hostname, cluster); err != nil {
|
||||
cclog.Warnf("Error while fetching metadata for node '%s'", hostname)
|
||||
return nil, err
|
||||
}
|
||||
node.MetaData = meta
|
||||
}
|
||||
|
||||
return node, nil
|
||||
}
|
||||
@@ -151,7 +151,7 @@ INSERT INTO node (hostname, cluster, subcluster)
|
||||
|
||||
// AddNode adds a Node to the node table. This can be triggered by a node collector registration or
|
||||
// from a nodestate update from the job scheduler.
|
||||
func (r *NodeRepository) AddNode(node *schema.Node) (int64, error) {
|
||||
func (r *NodeRepository) AddNode(node *schema.NodeDB) (int64, error) {
|
||||
var err error
|
||||
|
||||
res, err := r.DB.NamedExec(NamedNodeInsert, node)
|
||||
@@ -168,30 +168,15 @@ func (r *NodeRepository) AddNode(node *schema.Node) (int64, error) {
|
||||
return node.ID, nil
|
||||
}
|
||||
|
||||
func (r *NodeRepository) InsertNodeState(nodeState *schema.Node) error {
|
||||
subcluster, err := archive.GetSubClusterByNode(nodeState.Cluster, nodeState.Hostname)
|
||||
if err != nil {
|
||||
cclog.Errorf("Error while getting subcluster for node '%s' in cluster '%s': %v", nodeState.Hostname, nodeState.Cluster, err)
|
||||
return err
|
||||
}
|
||||
|
||||
nodeState.SubCluster = subcluster
|
||||
|
||||
_, err = r.DB.NamedExec(NamedNodeInsert, nodeState)
|
||||
if err != nil {
|
||||
cclog.Errorf("Error while insert node '%v' to database", nodeState.Hostname)
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
const NamedNodeStateInsert string = `
|
||||
INSERT INTO node_state (time_stamp, node_state, health_state, cpus_allocated,
|
||||
memory_allocated, gpus_allocated, jobs_running, node_id)
|
||||
VALUES (:time_stamp, :node_state, :health_state, :cpus_allocated, :memory_allocated, :gpus_allocated, :jobs_running, :node_id);`
|
||||
|
||||
func (r *NodeRepository) UpdateNodeState(hostname string, cluster string, nodeState *schema.NodeState) error {
|
||||
// TODO: Add real Monitoring Health State
|
||||
|
||||
// UpdateNodeState is called from the Node REST API to add a row in the node state table
|
||||
func (r *NodeRepository) UpdateNodeState(hostname string, cluster string, nodeState *schema.NodeStateDB) error {
|
||||
var id int64
|
||||
|
||||
if err := sq.Select("id").From("node").
|
||||
@@ -203,7 +188,7 @@ func (r *NodeRepository) UpdateNodeState(hostname string, cluster string, nodeSt
|
||||
cclog.Errorf("Error while getting subcluster for node '%s' in cluster '%s': %v", hostname, cluster, err)
|
||||
return err
|
||||
}
|
||||
node := schema.Node{
|
||||
node := schema.NodeDB{
|
||||
Hostname: hostname, Cluster: cluster, SubCluster: subcluster,
|
||||
}
|
||||
id, err = r.AddNode(&node)
|
||||
@@ -220,6 +205,8 @@ func (r *NodeRepository) UpdateNodeState(hostname string, cluster string, nodeSt
|
||||
}
|
||||
}
|
||||
|
||||
nodeState.NodeID = id
|
||||
|
||||
_, err := r.DB.NamedExec(NamedNodeStateInsert, nodeState)
|
||||
if err != nil {
|
||||
cclog.Errorf("Error while adding node state for '%v' to database", hostname)
|
||||
@@ -254,9 +241,11 @@ func (r *NodeRepository) QueryNodes(
|
||||
ctx context.Context,
|
||||
filters []*model.NodeFilter,
|
||||
order *model.OrderByInput, // Currently unused!
|
||||
) ([]*model.Node, error) {
|
||||
) ([]*schema.Node, error) {
|
||||
query, qerr := AccessCheck(ctx,
|
||||
sq.Select("node.hostname", "node.cluster", "node.subcluster", "node_state.node_state", "node_state.health_state", "MAX(node_state.time_stamp)").From("node_state").Join("node ON nodes_state.node_id = node.id").GroupBy("node_state.node_id"))
|
||||
sq.Select("node.hostname", "node.cluster", "node.subcluster", "node_state.node_state",
|
||||
"node_state.health_state", "MAX(node_state.time_stamp)").From("node_state").
|
||||
Join("node ON nodes_state.node_id = node.id").GroupBy("node_state.node_id"))
|
||||
if qerr != nil {
|
||||
return nil, qerr
|
||||
}
|
||||
@@ -271,8 +260,8 @@ func (r *NodeRepository) QueryNodes(
|
||||
if f.Subcluster != nil {
|
||||
query = buildStringCondition("node.subcluster", f.Subcluster, query)
|
||||
}
|
||||
if f.NodeState != nil {
|
||||
query = query.Where("node.node_state = ?", f.NodeState)
|
||||
if f.SchedulerState != nil {
|
||||
query = query.Where("node.node_state = ?", f.SchedulerState)
|
||||
}
|
||||
if f.HealthState != nil {
|
||||
query = query.Where("node.health_state = ?", f.HealthState)
|
||||
@@ -286,9 +275,9 @@ func (r *NodeRepository) QueryNodes(
|
||||
return nil, err
|
||||
}
|
||||
|
||||
nodes := make([]*model.Node, 0, 50)
|
||||
nodes := make([]*schema.Node, 0, 50)
|
||||
for rows.Next() {
|
||||
node := model.Node{}
|
||||
node := schema.Node{}
|
||||
|
||||
if err := rows.Scan(&node.Hostname, &node.Cluster, &node.SubCluster,
|
||||
&node.NodeState, &node.HealthState); err != nil {
|
||||
|
||||
190
internal/repository/node_test.go
Normal file
190
internal/repository/node_test.go
Normal file
@@ -0,0 +1,190 @@
|
||||
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
|
||||
// All rights reserved. This file is part of cc-backend.
|
||||
// Use of this source code is governed by a MIT-style
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
package repository
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/ClusterCockpit/cc-backend/internal/config"
|
||||
"github.com/ClusterCockpit/cc-backend/pkg/archive"
|
||||
ccconf "github.com/ClusterCockpit/cc-lib/ccConfig"
|
||||
cclog "github.com/ClusterCockpit/cc-lib/ccLogger"
|
||||
"github.com/ClusterCockpit/cc-lib/schema"
|
||||
_ "github.com/mattn/go-sqlite3"
|
||||
)
|
||||
|
||||
func nodeTestSetup(t *testing.T) {
|
||||
const testconfig = `{
|
||||
"main": {
|
||||
"addr": "0.0.0.0:8080",
|
||||
"validate": false,
|
||||
"apiAllowedIPs": [
|
||||
"*"
|
||||
]
|
||||
},
|
||||
"archive": {
|
||||
"kind": "file",
|
||||
"path": "./var/job-archive"
|
||||
},
|
||||
"auth": {
|
||||
"jwts": {
|
||||
"max-age": "2m"
|
||||
}
|
||||
},
|
||||
"clusters": [
|
||||
{
|
||||
"name": "testcluster",
|
||||
"metricDataRepository": {"kind": "test", "url": "bla:8081"},
|
||||
"filterRanges": {
|
||||
"numNodes": { "from": 1, "to": 64 },
|
||||
"duration": { "from": 0, "to": 86400 },
|
||||
"startTime": { "from": "2022-01-01T00:00:00Z", "to": null }
|
||||
}
|
||||
}
|
||||
]
|
||||
}`
|
||||
const testclusterJSON = `{
|
||||
"name": "testcluster",
|
||||
"subClusters": [
|
||||
{
|
||||
"name": "sc1",
|
||||
"nodes": "host123,host124,host125",
|
||||
"processorType": "Intel Core i7-4770",
|
||||
"socketsPerNode": 1,
|
||||
"coresPerSocket": 4,
|
||||
"threadsPerCore": 2,
|
||||
"flopRateScalar": {
|
||||
"unit": {
|
||||
"prefix": "G",
|
||||
"base": "F/s"
|
||||
},
|
||||
"value": 14
|
||||
},
|
||||
"flopRateSimd": {
|
||||
"unit": {
|
||||
"prefix": "G",
|
||||
"base": "F/s"
|
||||
},
|
||||
"value": 112
|
||||
},
|
||||
"memoryBandwidth": {
|
||||
"unit": {
|
||||
"prefix": "G",
|
||||
"base": "B/s"
|
||||
},
|
||||
"value": 24
|
||||
},
|
||||
"numberOfNodes": 70,
|
||||
"topology": {
|
||||
"node": [0, 1, 2, 3, 4, 5, 6, 7],
|
||||
"socket": [[0, 1, 2, 3, 4, 5, 6, 7]],
|
||||
"memoryDomain": [[0, 1, 2, 3, 4, 5, 6, 7]],
|
||||
"die": [[0, 1, 2, 3, 4, 5, 6, 7]],
|
||||
"core": [[0], [1], [2], [3], [4], [5], [6], [7]]
|
||||
}
|
||||
}
|
||||
],
|
||||
"metricConfig": [
|
||||
{
|
||||
"name": "load_one",
|
||||
"unit": { "base": ""},
|
||||
"scope": "node",
|
||||
"timestep": 60,
|
||||
"aggregation": "avg",
|
||||
"peak": 8,
|
||||
"normal": 0,
|
||||
"caution": 0,
|
||||
"alert": 0
|
||||
}
|
||||
]
|
||||
}`
|
||||
|
||||
cclog.Init("debug", true)
|
||||
tmpdir := t.TempDir()
|
||||
jobarchive := filepath.Join(tmpdir, "job-archive")
|
||||
if err := os.Mkdir(jobarchive, 0o777); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if err := os.WriteFile(filepath.Join(jobarchive, "version.txt"),
|
||||
fmt.Appendf(nil, "%d", 2), 0o666); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if err := os.Mkdir(filepath.Join(jobarchive, "testcluster"),
|
||||
0o777); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if err := os.WriteFile(filepath.Join(jobarchive, "testcluster", "cluster.json"),
|
||||
[]byte(testclusterJSON), 0o666); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
dbfilepath := filepath.Join(tmpdir, "test.db")
|
||||
err := MigrateDB("sqlite3", dbfilepath)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
cfgFilePath := filepath.Join(tmpdir, "config.json")
|
||||
if err := os.WriteFile(cfgFilePath, []byte(testconfig), 0o666); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
ccconf.Init(cfgFilePath)
|
||||
|
||||
// Load and check main configuration
|
||||
if cfg := ccconf.GetPackageConfig("main"); cfg != nil {
|
||||
if clustercfg := ccconf.GetPackageConfig("clusters"); clustercfg != nil {
|
||||
config.Init(cfg, clustercfg)
|
||||
} else {
|
||||
cclog.Abort("Cluster configuration must be present")
|
||||
}
|
||||
} else {
|
||||
cclog.Abort("Main configuration must be present")
|
||||
}
|
||||
archiveCfg := fmt.Sprintf("{\"kind\": \"file\",\"path\": \"%s\"}", jobarchive)
|
||||
|
||||
Connect("sqlite3", dbfilepath)
|
||||
|
||||
if err := archive.Init(json.RawMessage(archiveCfg), config.Keys.DisableArchive); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestUpdateNodeState(t *testing.T) {
|
||||
nodeTestSetup(t)
|
||||
|
||||
nodeState := schema.NodeStateDB{
|
||||
TimeStamp: time.Now().Unix(), NodeState: "allocated",
|
||||
CpusAllocated: 72,
|
||||
MemoryAllocated: 480,
|
||||
GpusAllocated: 0,
|
||||
HealthState: schema.MonitoringStateFull,
|
||||
JobsRunning: 1,
|
||||
}
|
||||
|
||||
repo := GetNodeRepository()
|
||||
err := repo.UpdateNodeState("host124", "testcluster", &nodeState)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
node, err := repo.GetNode("host124", "testcluster", false)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
if node.NodeState != "allocated" {
|
||||
t.Errorf("wrong node state\ngot: %s \nwant: allocated ", node.NodeState)
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user