Add node repository, extend GraphQL API

Sync commit.
This commit is contained in:
Jan Eitzinger 2025-06-04 13:44:37 +02:00
parent 8b1b99ba35
commit 6f9737c2c2
5 changed files with 1315 additions and 314 deletions

View File

@ -4,61 +4,78 @@ scalar Any
scalar NullableFloat scalar NullableFloat
scalar MetricScope scalar MetricScope
scalar JobState scalar JobState
scalar NodeState
scalar MonitoringState
type Node {
id: ID!
hostname: String!
cluster: String!
subCluster: String!
nodeState: NodeState!
HealthState: MonitoringState!
metaData: Any
}
type NodeStats {
state: String!
count: Int!
}
type Job { type Job {
id: ID! id: ID!
jobId: Int! jobId: Int!
user: String! user: String!
project: String! project: String!
cluster: String! cluster: String!
subCluster: String! subCluster: String!
startTime: Time! startTime: Time!
duration: Int! duration: Int!
walltime: Int! walltime: Int!
numNodes: Int! numNodes: Int!
numHWThreads: Int! numHWThreads: Int!
numAcc: Int! numAcc: Int!
energy: Float! energy: Float!
SMT: Int! SMT: Int!
exclusive: Int! exclusive: Int!
partition: String! partition: String!
arrayJobId: Int! arrayJobId: Int!
monitoringStatus: Int! monitoringStatus: Int!
state: JobState! state: JobState!
tags: [Tag!]! tags: [Tag!]!
resources: [Resource!]! resources: [Resource!]!
concurrentJobs: JobLinkResultList concurrentJobs: JobLinkResultList
footprint: [FootprintValue] footprint: [FootprintValue]
energyFootprint: [EnergyFootprintValue] energyFootprint: [EnergyFootprintValue]
metaData: Any metaData: Any
userData: User userData: User
} }
type JobLink { type JobLink {
id: ID! id: ID!
jobId: Int! jobId: Int!
} }
type Cluster { type Cluster {
name: String! name: String!
partitions: [String!]! # Slurm partitions partitions: [String!]! # Slurm partitions
subClusters: [SubCluster!]! # Hardware partitions/subclusters subClusters: [SubCluster!]! # Hardware partitions/subclusters
} }
type SubCluster { type SubCluster {
name: String! name: String!
nodes: String! nodes: String!
numberOfNodes: Int! numberOfNodes: Int!
processorType: String! processorType: String!
socketsPerNode: Int! socketsPerNode: Int!
coresPerSocket: Int! coresPerSocket: Int!
threadsPerCore: Int! threadsPerCore: Int!
flopRateScalar: MetricValue! flopRateScalar: MetricValue!
flopRateSimd: MetricValue! flopRateSimd: MetricValue!
memoryBandwidth: MetricValue! memoryBandwidth: MetricValue!
topology: Topology! topology: Topology!
metricConfig: [MetricConfig!]! metricConfig: [MetricConfig!]!
footprint: [String!]! footprint: [String!]!
} }
type FootprintValue { type FootprintValue {
@ -80,94 +97,94 @@ type MetricValue {
} }
type Topology { type Topology {
node: [Int!] node: [Int!]
socket: [[Int!]!] socket: [[Int!]!]
memoryDomain: [[Int!]!] memoryDomain: [[Int!]!]
die: [[Int!]!] die: [[Int!]!]
core: [[Int!]!] core: [[Int!]!]
accelerators: [Accelerator!] accelerators: [Accelerator!]
} }
type Accelerator { type Accelerator {
id: String! id: String!
type: String! type: String!
model: String! model: String!
} }
type SubClusterConfig { type SubClusterConfig {
name: String! name: String!
peak: Float peak: Float
normal: Float normal: Float
caution: Float caution: Float
alert: Float alert: Float
remove: Boolean remove: Boolean
} }
type MetricConfig { type MetricConfig {
name: String! name: String!
unit: Unit! unit: Unit!
scope: MetricScope! scope: MetricScope!
aggregation: String! aggregation: String!
timestep: Int! timestep: Int!
peak: Float! peak: Float!
normal: Float normal: Float
caution: Float! caution: Float!
alert: Float! alert: Float!
lowerIsBetter: Boolean lowerIsBetter: Boolean
subClusters: [SubClusterConfig!]! subClusters: [SubClusterConfig!]!
} }
type Tag { type Tag {
id: ID! id: ID!
type: String! type: String!
name: String! name: String!
scope: String! scope: String!
} }
type Resource { type Resource {
hostname: String! hostname: String!
hwthreads: [Int!] hwthreads: [Int!]
accelerators: [String!] accelerators: [String!]
configuration: String configuration: String
} }
type JobMetricWithName { type JobMetricWithName {
name: String! name: String!
scope: MetricScope! scope: MetricScope!
metric: JobMetric! metric: JobMetric!
} }
type JobMetric { type JobMetric {
unit: Unit unit: Unit
timestep: Int! timestep: Int!
series: [Series!] series: [Series!]
statisticsSeries: StatsSeries statisticsSeries: StatsSeries
} }
type Series { type Series {
hostname: String! hostname: String!
id: String id: String
statistics: MetricStatistics statistics: MetricStatistics
data: [NullableFloat!]! data: [NullableFloat!]!
} }
type StatsSeries { type StatsSeries {
mean: [NullableFloat!]! mean: [NullableFloat!]!
median: [NullableFloat!]! median: [NullableFloat!]!
min: [NullableFloat!]! min: [NullableFloat!]!
max: [NullableFloat!]! max: [NullableFloat!]!
} }
type NamedStatsWithScope { type NamedStatsWithScope {
name: String! name: String!
scope: MetricScope! scope: MetricScope!
stats: [ScopedStats!]! stats: [ScopedStats!]!
} }
type ScopedStats { type ScopedStats {
hostname: String! hostname: String!
id: String id: String
data: MetricStatistics! data: MetricStatistics!
} }
type JobStats { type JobStats {
@ -184,8 +201,8 @@ type JobStats {
} }
type NamedStats { type NamedStats {
name: String! name: String!
data: MetricStatistics! data: MetricStatistics!
} }
type Unit { type Unit {
@ -201,12 +218,12 @@ type MetricStatistics {
type MetricFootprints { type MetricFootprints {
metric: String! metric: String!
data: [NullableFloat!]! data: [NullableFloat!]!
} }
type Footprints { type Footprints {
timeWeights: TimeWeights! timeWeights: TimeWeights!
metrics: [MetricFootprints!]! metrics: [MetricFootprints!]!
} }
type TimeWeights { type TimeWeights {
@ -215,20 +232,33 @@ type TimeWeights {
coreHours: [NullableFloat!]! coreHours: [NullableFloat!]!
} }
enum Aggregate { USER, PROJECT, CLUSTER } enum Aggregate {
enum SortByAggregate { TOTALWALLTIME, TOTALJOBS, TOTALNODES, TOTALNODEHOURS, TOTALCORES, TOTALCOREHOURS, TOTALACCS, TOTALACCHOURS } USER
PROJECT
CLUSTER
}
enum SortByAggregate {
TOTALWALLTIME
TOTALJOBS
TOTALNODES
TOTALNODEHOURS
TOTALCORES
TOTALCOREHOURS
TOTALACCS
TOTALACCHOURS
}
type NodeMetrics { type NodeMetrics {
host: String! host: String!
subCluster: String! subCluster: String!
metrics: [JobMetricWithName!]! metrics: [JobMetricWithName!]!
} }
type NodesResultList { type NodesResultList {
items: [NodeMetrics!]! items: [NodeMetrics!]!
offset: Int offset: Int
limit: Int limit: Int
count: Int count: Int
totalNodes: Int totalNodes: Int
hasNextPage: Boolean hasNextPage: Boolean
} }
@ -247,14 +277,14 @@ type GlobalMetricListItem {
} }
type Count { type Count {
name: String! name: String!
count: Int! count: Int!
} }
type User { type User {
username: String! username: String!
name: String! name: String!
email: String! email: String!
} }
input MetricStatItem { input MetricStatItem {
@ -263,27 +293,81 @@ input MetricStatItem {
} }
type Query { type Query {
clusters: [Cluster!]! # List of all clusters clusters: [Cluster!]! # List of all clusters
tags: [Tag!]! # List of all tags tags: [Tag!]! # List of all tags
globalMetrics: [GlobalMetricListItem!]! globalMetrics: [GlobalMetricListItem!]!
user(username: String!): User user(username: String!): User
allocatedNodes(cluster: String!): [Count!]! allocatedNodes(cluster: String!): [Count!]!
job(id: ID!): Job node(id: ID!): Node
jobMetrics(id: ID!, metrics: [String!], scopes: [MetricScope!], resolution: Int): [JobMetricWithName!]! nodes(filter: [NodeFilter!], order: OrderByInput): NodesResultList!
jobStats(id: ID!, metrics: [String!]): [NamedStats!]! nodeStats(filter: [NodeFilter!]): [NodeStats!]!
scopedJobStats(id: ID!, metrics: [String!], scopes: [MetricScope!]): [NamedStatsWithScope!]!
job(id: ID!): Job
jobMetrics(
id: ID!
metrics: [String!]
scopes: [MetricScope!]
resolution: Int
): [JobMetricWithName!]!
jobStats(id: ID!, metrics: [String!]): [NamedStats!]!
scopedJobStats(
id: ID!
metrics: [String!]
scopes: [MetricScope!]
): [NamedStatsWithScope!]!
jobs(
filter: [JobFilter!]
page: PageRequest
order: OrderByInput
): JobResultList!
jobsStatistics(
filter: [JobFilter!]
metrics: [String!]
page: PageRequest
sortBy: SortByAggregate
groupBy: Aggregate
numDurationBins: String
numMetricBins: Int
): [JobsStatistics!]!
jobs(filter: [JobFilter!], page: PageRequest, order: OrderByInput): JobResultList!
jobsStatistics(filter: [JobFilter!], metrics: [String!], page: PageRequest, sortBy: SortByAggregate, groupBy: Aggregate, numDurationBins: String, numMetricBins: Int): [JobsStatistics!]!
jobsMetricStats(filter: [JobFilter!], metrics: [String!]): [JobStats!]! jobsMetricStats(filter: [JobFilter!], metrics: [String!]): [JobStats!]!
jobsFootprints(filter: [JobFilter!], metrics: [String!]!): Footprints jobsFootprints(filter: [JobFilter!], metrics: [String!]!): Footprints
rooflineHeatmap(filter: [JobFilter!]!, rows: Int!, cols: Int!, minX: Float!, minY: Float!, maxX: Float!, maxY: Float!): [[Float!]!]! rooflineHeatmap(
filter: [JobFilter!]!
rows: Int!
cols: Int!
minX: Float!
minY: Float!
maxX: Float!
maxY: Float!
): [[Float!]!]!
nodeMetrics(cluster: String!, nodes: [String!], scopes: [MetricScope!], metrics: [String!], from: Time!, to: Time!): [NodeMetrics!]! nodeMetrics(
nodeMetricsList(cluster: String!, subCluster: String!, nodeFilter: String!, scopes: [MetricScope!], metrics: [String!], from: Time!, to: Time!, page: PageRequest, resolution: Int): NodesResultList! cluster: String!
nodes: [String!]
scopes: [MetricScope!]
metrics: [String!]
from: Time!
to: Time!
): [NodeMetrics!]!
nodeMetricsList(
cluster: String!
subCluster: String!
nodeFilter: String!
scopes: [MetricScope!]
metrics: [String!]
from: Time!
to: Time!
page: PageRequest
resolution: Int
): NodesResultList!
} }
type Mutation { type Mutation {
@ -296,38 +380,53 @@ type Mutation {
updateConfiguration(name: String!, value: String!): String updateConfiguration(name: String!, value: String!): String
} }
type IntRangeOutput { from: Int!, to: Int! } type IntRangeOutput {
type TimeRangeOutput { range: String, from: Time!, to: Time! } from: Int!
to: Int!
}
type TimeRangeOutput {
range: String
from: Time!
to: Time!
}
input NodeFilter {
hostname: StringInput
cluster: StringInput
subCluster: StringInput
nodeState: NodeState
healthState: MonitoringState
}
input JobFilter { input JobFilter {
tags: [ID!] tags: [ID!]
dbId: [ID!] dbId: [ID!]
jobId: StringInput jobId: StringInput
arrayJobId: Int arrayJobId: Int
user: StringInput user: StringInput
project: StringInput project: StringInput
jobName: StringInput jobName: StringInput
cluster: StringInput cluster: StringInput
partition: StringInput partition: StringInput
duration: IntRange duration: IntRange
energy: FloatRange energy: FloatRange
minRunningFor: Int minRunningFor: Int
numNodes: IntRange numNodes: IntRange
numAccelerators: IntRange numAccelerators: IntRange
numHWThreads: IntRange numHWThreads: IntRange
startTime: TimeRange startTime: TimeRange
state: [JobState!] state: [JobState!]
metricStats: [MetricStatItem!] metricStats: [MetricStatItem!]
exclusive: Int exclusive: Int
node: StringInput node: StringInput
} }
input OrderByInput { input OrderByInput {
field: String! field: String!
type: String!, type: String!
order: SortDirectionEnum! = ASC order: SortDirectionEnum! = ASC
} }
@ -337,34 +436,46 @@ enum SortDirectionEnum {
} }
input StringInput { input StringInput {
eq: String eq: String
neq: String neq: String
contains: String contains: String
startsWith: String startsWith: String
endsWith: String endsWith: String
in: [String!] in: [String!]
} }
input IntRange { from: Int!, to: Int! } input IntRange {
input TimeRange { range: String, from: Time, to: Time } from: Int!
to: Int!
}
input TimeRange {
range: String
from: Time
to: Time
}
input FloatRange { input FloatRange {
from: Float! from: Float!
to: Float! to: Float!
} }
type NodesResultList {
items: [Node!]!
count: Int
}
type JobResultList { type JobResultList {
items: [Job!]! items: [Job!]!
offset: Int offset: Int
limit: Int limit: Int
count: Int count: Int
hasNextPage: Boolean hasNextPage: Boolean
} }
type JobLinkResultList { type JobLinkResultList {
listQuery: String listQuery: String
items: [JobLink!]! items: [JobLink!]!
count: Int count: Int
} }
type HistoPoint { type HistoPoint {
@ -386,27 +497,27 @@ type MetricHistoPoint {
max: Int max: Int
} }
type JobsStatistics { type JobsStatistics {
id: ID! # If `groupBy` was used, ID of the user/project/cluster id: ID! # If `groupBy` was used, ID of the user/project/cluster
name: String! # if User-Statistics: Given Name of Account (ID) Owner name: String! # if User-Statistics: Given Name of Account (ID) Owner
totalJobs: Int! # Number of jobs totalJobs: Int! # Number of jobs
runningJobs: Int! # Number of running jobs runningJobs: Int! # Number of running jobs
shortJobs: Int! # Number of jobs with a duration of less than duration shortJobs: Int! # Number of jobs with a duration of less than duration
totalWalltime: Int! # Sum of the duration of all matched jobs in hours totalWalltime: Int! # Sum of the duration of all matched jobs in hours
totalNodes: Int! # Sum of the nodes of all matched jobs totalNodes: Int! # Sum of the nodes of all matched jobs
totalNodeHours: Int! # Sum of the node hours of all matched jobs totalNodeHours: Int! # Sum of the node hours of all matched jobs
totalCores: Int! # Sum of the cores of all matched jobs totalCores: Int! # Sum of the cores of all matched jobs
totalCoreHours: Int! # Sum of the core hours of all matched jobs totalCoreHours: Int! # Sum of the core hours of all matched jobs
totalAccs: Int! # Sum of the accs of all matched jobs totalAccs: Int! # Sum of the accs of all matched jobs
totalAccHours: Int! # Sum of the gpu hours of all matched jobs totalAccHours: Int! # Sum of the gpu hours of all matched jobs
histDuration: [HistoPoint!]! # value: hour, count: number of jobs with a rounded duration of value histDuration: [HistoPoint!]! # value: hour, count: number of jobs with a rounded duration of value
histNumNodes: [HistoPoint!]! # value: number of nodes, count: number of jobs with that number of nodes histNumNodes: [HistoPoint!]! # value: number of nodes, count: number of jobs with that number of nodes
histNumCores: [HistoPoint!]! # value: number of cores, count: number of jobs with that number of cores histNumCores: [HistoPoint!]! # value: number of cores, count: number of jobs with that number of cores
histNumAccs: [HistoPoint!]! # value: number of accs, count: number of jobs with that number of accs histNumAccs: [HistoPoint!]! # value: number of accs, count: number of jobs with that number of accs
histMetrics: [MetricHistoPoints!]! # metric: metricname, data array of histopoints: value: metric average bin, count: number of jobs with that metric average histMetrics: [MetricHistoPoints!]! # metric: metricname, data array of histopoints: value: metric average bin, count: number of jobs with that metric average
} }
input PageRequest { input PageRequest {
itemsPerPage: Int! itemsPerPage: Int!
page: Int! page: Int!
} }

View File

@ -62,6 +62,11 @@ models:
fields: fields:
partitions: partitions:
resolver: true resolver: true
Node:
model: "github.com/ClusterCockpit/cc-backend/pkg/schema.Node"
fields:
metaData:
resolver: true
NullableFloat: NullableFloat:
{ model: "github.com/ClusterCockpit/cc-backend/pkg/schema.Float" } { model: "github.com/ClusterCockpit/cc-backend/pkg/schema.Float" }
MetricScope: MetricScope:
@ -81,6 +86,10 @@ models:
{ model: "github.com/ClusterCockpit/cc-backend/pkg/schema.Resource" } { model: "github.com/ClusterCockpit/cc-backend/pkg/schema.Resource" }
JobState: JobState:
{ model: "github.com/ClusterCockpit/cc-backend/pkg/schema.JobState" } { model: "github.com/ClusterCockpit/cc-backend/pkg/schema.JobState" }
MonitoringState:
{ model: "github.com/ClusterCockpit/cc-backend/pkg/schema.NodeState" }
HealthState:
{ model: "github.com/ClusterCockpit/cc-backend/pkg/schema.MonitoringState" }
TimeRange: TimeRange:
{ model: "github.com/ClusterCockpit/cc-backend/pkg/schema.TimeRange" } { model: "github.com/ClusterCockpit/cc-backend/pkg/schema.TimeRange" }
IntRange: IntRange:

File diff suppressed because it is too large Load Diff

View File

@ -304,6 +304,21 @@ func (r *mutationResolver) UpdateConfiguration(ctx context.Context, name string,
return nil, nil return nil, nil
} }
// NodeState is the resolver for the nodeState field.
func (r *nodeResolver) NodeState(ctx context.Context, obj *schema.Node) (string, error) {
panic(fmt.Errorf("not implemented: NodeState - nodeState"))
}
// HealthState is the resolver for the HealthState field.
func (r *nodeResolver) HealthState(ctx context.Context, obj *schema.Node) (schema.NodeState, error) {
panic(fmt.Errorf("not implemented: HealthState - HealthState"))
}
// MetaData is the resolver for the metaData field.
func (r *nodeResolver) MetaData(ctx context.Context, obj *schema.Node) (any, error) {
panic(fmt.Errorf("not implemented: MetaData - metaData"))
}
// Clusters is the resolver for the clusters field. // Clusters is the resolver for the clusters field.
func (r *queryResolver) Clusters(ctx context.Context) ([]*schema.Cluster, error) { func (r *queryResolver) Clusters(ctx context.Context) ([]*schema.Cluster, error) {
return archive.Clusters, nil return archive.Clusters, nil
@ -775,6 +790,9 @@ func (r *Resolver) MetricValue() generated.MetricValueResolver { return &metricV
// Mutation returns generated.MutationResolver implementation. // Mutation returns generated.MutationResolver implementation.
func (r *Resolver) Mutation() generated.MutationResolver { return &mutationResolver{r} } func (r *Resolver) Mutation() generated.MutationResolver { return &mutationResolver{r} }
// Node returns generated.NodeResolver implementation.
func (r *Resolver) Node() generated.NodeResolver { return &nodeResolver{r} }
// Query returns generated.QueryResolver implementation. // Query returns generated.QueryResolver implementation.
func (r *Resolver) Query() generated.QueryResolver { return &queryResolver{r} } func (r *Resolver) Query() generated.QueryResolver { return &queryResolver{r} }
@ -785,5 +803,6 @@ type clusterResolver struct{ *Resolver }
type jobResolver struct{ *Resolver } type jobResolver struct{ *Resolver }
type metricValueResolver struct{ *Resolver } type metricValueResolver struct{ *Resolver }
type mutationResolver struct{ *Resolver } type mutationResolver struct{ *Resolver }
type nodeResolver struct{ *Resolver }
type queryResolver struct{ *Resolver } type queryResolver struct{ *Resolver }
type subClusterResolver struct{ *Resolver } type subClusterResolver struct{ *Resolver }

217
internal/repository/node.go Normal file
View File

@ -0,0 +1,217 @@
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
// All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package repository
import (
"encoding/json"
"fmt"
"maps"
"sync"
"time"
"github.com/ClusterCockpit/cc-backend/pkg/log"
"github.com/ClusterCockpit/cc-backend/pkg/lrucache"
"github.com/ClusterCockpit/cc-backend/pkg/schema"
sq "github.com/Masterminds/squirrel"
"github.com/jmoiron/sqlx"
)
var (
nodeRepoOnce sync.Once
nodeRepoInstance *NodeRepository
)
type NodeRepository struct {
DB *sqlx.DB
stmtCache *sq.StmtCache
cache *lrucache.Cache
driver string
}
func GetNodeRepository() *NodeRepository {
nodeRepoOnce.Do(func() {
db := GetConnection()
nodeRepoInstance = &NodeRepository{
DB: db.DB,
driver: db.Driver,
stmtCache: sq.NewStmtCache(db.DB),
cache: lrucache.New(1024 * 1024),
}
})
return nodeRepoInstance
}
func (r *NodeRepository) FetchMetadata(node *schema.Node) (map[string]string, error) {
start := time.Now()
cachekey := fmt.Sprintf("metadata:%d", node.ID)
if cached := r.cache.Get(cachekey, nil); cached != nil {
node.MetaData = cached.(map[string]string)
return node.MetaData, nil
}
if err := sq.Select("node.meta_data").From("node").Where("node.id = ?", node.ID).
RunWith(r.stmtCache).QueryRow().Scan(&node.RawMetaData); err != nil {
log.Warn("Error while scanning for node metadata")
return nil, err
}
if len(node.RawMetaData) == 0 {
return nil, nil
}
if err := json.Unmarshal(node.RawMetaData, &node.MetaData); err != nil {
log.Warn("Error while unmarshaling raw metadata json")
return nil, err
}
r.cache.Put(cachekey, node.MetaData, len(node.RawMetaData), 24*time.Hour)
log.Debugf("Timer FetchMetadata %s", time.Since(start))
return node.MetaData, nil
}
func (r *NodeRepository) UpdateMetadata(node *schema.Node, key, val string) (err error) {
cachekey := fmt.Sprintf("metadata:%d", node.ID)
r.cache.Del(cachekey)
if node.MetaData == nil {
if _, err = r.FetchMetadata(node); err != nil {
log.Warnf("Error while fetching metadata for node, DB ID '%v'", node.ID)
return err
}
}
if node.MetaData != nil {
cpy := make(map[string]string, len(node.MetaData)+1)
maps.Copy(cpy, node.MetaData)
cpy[key] = val
node.MetaData = cpy
} else {
node.MetaData = map[string]string{key: val}
}
if node.RawMetaData, err = json.Marshal(node.MetaData); err != nil {
log.Warnf("Error while marshaling metadata for node, DB ID '%v'", node.ID)
return err
}
if _, err = sq.Update("node").
Set("meta_data", node.RawMetaData).
Where("node.id = ?", node.ID).
RunWith(r.stmtCache).Exec(); err != nil {
log.Warnf("Error while updating metadata for node, DB ID '%v'", node.ID)
return err
}
r.cache.Put(cachekey, node.MetaData, len(node.RawMetaData), 24*time.Hour)
return nil
}
func (r *NodeRepository) GetNode(id int64, withMeta bool) (*schema.Node, error) {
node := &schema.Node{}
if err := sq.Select("id", "hostname", "cluster", "subcluster", "node_state",
"health_state").From("node").
Where("node.id = ?", id).RunWith(r.DB).
QueryRow().Scan(&node.ID, &node.Hostname, &node.Cluster, &node.SubCluster, &node.NodeState,
&node.HealthState); err != nil {
log.Warnf("Error while querying node '%v' from database", id)
return nil, err
}
if withMeta {
var err error
var meta map[string]string
if meta, err = r.FetchMetadata(node); err != nil {
log.Warnf("Error while fetching metadata for node '%v'", id)
return nil, err
}
node.MetaData = meta
}
return node, nil
}
const NamedNodeInsert string = `
INSERT INTO node (hostname, cluster, subcluster, node_state, health_state, raw_meta_data)
VALUES (:hostname, :cluster, :subcluster, :node_state, :health_state, :raw_meta_data);`
func (r *NodeRepository) AddNode(node *schema.Node) (int64, error) {
var err error
node.RawMetaData, err = json.Marshal(node.MetaData)
if err != nil {
log.Errorf("Error while marshaling metadata for node '%v'", node.Hostname)
return 0, err
}
res, err := r.DB.NamedExec(NamedNodeInsert, node)
if err != nil {
log.Errorf("Error while adding node '%v' to database", node.Hostname)
return 0, err
}
node.ID, err = res.LastInsertId()
if err != nil {
log.Errorf("Error while getting last insert id for node '%v' from database", node.Hostname)
return 0, err
}
return node.ID, nil
}
func (r *NodeRepository) UpdateNodeState(id int64, nodeState *schema.NodeState) error {
if _, err := sq.Update("node").Set("node_state", nodeState).Where("node.id = ?", id).RunWith(r.DB).Exec(); err != nil {
log.Errorf("error while updating node '%d'", id)
return err
}
return nil
}
func (r *NodeRepository) UpdateHealthState(id int64, healthState *schema.MonitoringState) error {
if _, err := sq.Update("node").Set("health_state", healthState).Where("node.id = ?", id).RunWith(r.DB).Exec(); err != nil {
log.Errorf("error while updating node '%d'", id)
return err
}
return nil
}
func (r *NodeRepository) DeleteNode(id int64) error {
_, err := r.DB.Exec(`DELETE FROM node WHERE node.id = ?`, id)
if err != nil {
log.Errorf("Error while deleting node '%d' from DB", id)
return err
}
log.Infof("deleted node '%d' from DB", id)
return nil
}
func (r *NodeRepository) QueryNodes() ([]*schema.Node, error) {
return nil, nil
}
func (r *NodeRepository) ListNodes(cluster string) ([]*schema.Node, error) {
q := sq.Select("hostname", "cluster", "subcluster", "node_state",
"health_state").From("node").Where("node.cluster = ?", cluster).OrderBy("node.hostname ASC")
rows, err := q.RunWith(r.DB).Query()
if err != nil {
log.Warn("Error while querying user list")
return nil, err
}
nodeList := make([]*schema.Node, 0, 100)
defer rows.Close()
for rows.Next() {
node := &schema.Node{}
if err := rows.Scan(&node.Hostname, &node.Cluster,
&node.SubCluster, &node.NodeState, &node.HealthState); err != nil {
log.Warn("Error while scanning node list")
return nil, err
}
nodeList = append(nodeList, node)
}
return nodeList, nil
}