diff --git a/internal/repository/migrations/sqlite3/10_node-table.up.sql b/internal/repository/migrations/sqlite3/10_node-table.up.sql index fd118f5d..b788a8a9 100644 --- a/internal/repository/migrations/sqlite3/10_node-table.up.sql +++ b/internal/repository/migrations/sqlite3/10_node-table.up.sql @@ -38,6 +38,7 @@ CREATE INDEX IF NOT EXISTS nodestates_state_timestamp ON node_state (node_state, CREATE INDEX IF NOT EXISTS nodestates_health_timestamp ON node_state (health_state, time_stamp); CREATE INDEX IF NOT EXISTS nodestates_nodeid_state ON node_state (node_id, node_state); CREATE INDEX IF NOT EXISTS nodestates_nodeid_health ON node_state (node_id, health_state); +CREATE INDEX IF NOT EXISTS nodestates_nodeid_timestamp ON node_state (node_id, time_stamp DESC); -- Add NEW Indices For Increased Amounts of Tags CREATE INDEX IF NOT EXISTS tags_jobid ON jobtag (job_id); diff --git a/internal/repository/node.go b/internal/repository/node.go index 08a694c6..2ffe6698 100644 --- a/internal/repository/node.go +++ b/internal/repository/node.go @@ -52,6 +52,38 @@ func GetNodeRepository() *NodeRepository { return nodeRepoInstance } +// latestStateCondition returns a squirrel expression that restricts node_state +// rows to the latest per node_id using a correlated subquery. +// Requires the query to join node and node_state tables. +func latestStateCondition() sq.Sqlizer { + return sq.Expr( + "node_state.id = (SELECT ns2.id FROM node_state ns2 WHERE ns2.node_id = node.id ORDER BY ns2.time_stamp DESC LIMIT 1)", + ) +} + +// applyNodeFilters applies common NodeFilter conditions to a query that joins +// the node and node_state tables with latestStateCondition. +func applyNodeFilters(query sq.SelectBuilder, filters []*model.NodeFilter) sq.SelectBuilder { + for _, f := range filters { + if f.Cluster != nil { + query = buildStringCondition("node.cluster", f.Cluster, query) + } + if f.SubCluster != nil { + query = buildStringCondition("node.subcluster", f.SubCluster, query) + } + if f.Hostname != nil { + query = buildStringCondition("node.hostname", f.Hostname, query) + } + if f.SchedulerState != nil { + query = query.Where("node_state.node_state = ?", f.SchedulerState) + } + if f.HealthState != nil { + query = query.Where("node_state.health_state = ?", f.HealthState) + } + } + return query +} + func (r *NodeRepository) FetchMetadata(hostname string, cluster string) (map[string]string, error) { start := time.Now() @@ -82,17 +114,16 @@ func (r *NodeRepository) FetchMetadata(hostname string, cluster string) (map[str func (r *NodeRepository) GetNode(hostname string, cluster string, withMeta bool) (*schema.Node, error) { node := &schema.Node{} - var timestamp int - if err := sq.Select("node.hostname", "node.cluster", "node.subcluster", "node_state.node_state", - "node_state.health_state", "MAX(node_state.time_stamp) as time"). - From("node_state"). - Join("node ON node_state.node_id = node.id"). + if err := sq.Select("node.hostname", "node.cluster", "node.subcluster", + "node_state.node_state", "node_state.health_state"). + From("node"). + Join("node_state ON node_state.node_id = node.id"). + Where(latestStateCondition()). Where("node.hostname = ?", hostname). Where("node.cluster = ?", cluster). - GroupBy("node_state.node_id"). RunWith(r.DB). - QueryRow().Scan(&node.Hostname, &node.Cluster, &node.SubCluster, &node.NodeState, &node.HealthState, ×tamp); err != nil { - cclog.Warnf("Error while querying node '%s' at time '%d' from database: %v", hostname, timestamp, err) + QueryRow().Scan(&node.Hostname, &node.Cluster, &node.SubCluster, &node.NodeState, &node.HealthState); err != nil { + cclog.Warnf("Error while querying node '%s' from database: %v", hostname, err) return nil, err } @@ -111,16 +142,15 @@ func (r *NodeRepository) GetNode(hostname string, cluster string, withMeta bool) func (r *NodeRepository) GetNodeByID(id int64, withMeta bool) (*schema.Node, error) { node := &schema.Node{} - var timestamp int - if err := sq.Select("node.hostname", "node.cluster", "node.subcluster", "node_state.node_state", - "node_state.health_state", "MAX(node_state.time_stamp) as time"). - From("node_state"). - Join("node ON node_state.node_id = node.id"). + if err := sq.Select("node.hostname", "node.cluster", "node.subcluster", + "node_state.node_state", "node_state.health_state"). + From("node"). + Join("node_state ON node_state.node_id = node.id"). + Where(latestStateCondition()). Where("node.id = ?", id). - GroupBy("node_state.node_id"). RunWith(r.DB). - QueryRow().Scan(&node.Hostname, &node.Cluster, &node.SubCluster, &node.NodeState, &node.HealthState, ×tamp); err != nil { - cclog.Warnf("Error while querying node ID '%d' at time '%d' from database: %v", id, timestamp, err) + QueryRow().Scan(&node.Hostname, &node.Cluster, &node.SubCluster, &node.NodeState, &node.HealthState); err != nil { + cclog.Warnf("Error while querying node ID '%d' from database: %v", id, err) return nil, err } @@ -313,40 +343,17 @@ func (r *NodeRepository) QueryNodes( order *model.OrderByInput, // Currently unused! ) ([]*schema.Node, error) { query, qerr := AccessCheck(ctx, - sq.Select("hostname", "cluster", "subcluster", "node_state", "health_state", "MAX(time_stamp) as time"). + sq.Select("node.hostname", "node.cluster", "node.subcluster", + "node_state.node_state", "node_state.health_state"). From("node"). - Join("node_state ON node_state.node_id = node.id")) + Join("node_state ON node_state.node_id = node.id"). + Where(latestStateCondition())) if qerr != nil { return nil, qerr } - for _, f := range filters { - if f.Cluster != nil { - query = buildStringCondition("cluster", f.Cluster, query) - } - if f.SubCluster != nil { - query = buildStringCondition("subcluster", f.SubCluster, query) - } - if f.Hostname != nil { - query = buildStringCondition("hostname", f.Hostname, query) - } - if f.SchedulerState != nil { - query = query.Where("node_state = ?", f.SchedulerState) - // Requires Additional time_stamp Filter: Else the last (past!) time_stamp with queried state will be returned - // TODO: Hardcoded TimeDiff Suboptimal - Use Config Option? - now := time.Now().Unix() - query = query.Where(sq.Gt{"time_stamp": (now - 300)}) - } - if f.HealthState != nil { - query = query.Where("health_state = ?", f.HealthState) - // Requires Additional time_stamp Filter: Else the last (past!) time_stamp with queried state will be returned - // TODO: Hardcoded TimeDiff Suboptimal - Use Config Option? - now := time.Now().Unix() - query = query.Where(sq.Gt{"time_stamp": (now - 300)}) - } - } - - query = query.GroupBy("node_id").OrderBy("hostname ASC") + query = applyNodeFilters(query, filters) + query = query.OrderBy("node.hostname ASC") if page != nil && page.ItemsPerPage != -1 { limit := uint64(page.ItemsPerPage) @@ -363,11 +370,10 @@ func (r *NodeRepository) QueryNodes( nodes := make([]*schema.Node, 0) for rows.Next() { node := schema.Node{} - var timestamp int if err := rows.Scan(&node.Hostname, &node.Cluster, &node.SubCluster, - &node.NodeState, &node.HealthState, ×tamp); err != nil { + &node.NodeState, &node.HealthState); err != nil { rows.Close() - cclog.Warnf("Error while scanning rows (QueryNodes) at time '%d'", timestamp) + cclog.Warn("Error while scanning rows (QueryNodes)") return nil, err } nodes = append(nodes, &node) @@ -377,74 +383,39 @@ func (r *NodeRepository) QueryNodes( } // CountNodes returns the total matched nodes based on a node filter. It always operates -// on the last state (largest timestamp). +// on the last state (largest timestamp) per node. func (r *NodeRepository) CountNodes( ctx context.Context, filters []*model.NodeFilter, ) (int, error) { query, qerr := AccessCheck(ctx, - sq.Select("time_stamp", "count(*) as countRes"). + sq.Select("COUNT(*)"). From("node"). - Join("node_state ON node_state.node_id = node.id")) + Join("node_state ON node_state.node_id = node.id"). + Where(latestStateCondition())) if qerr != nil { return 0, qerr } - for _, f := range filters { - if f.Cluster != nil { - query = buildStringCondition("cluster", f.Cluster, query) - } - if f.SubCluster != nil { - query = buildStringCondition("subcluster", f.SubCluster, query) - } - if f.Hostname != nil { - query = buildStringCondition("hostname", f.Hostname, query) - } - if f.SchedulerState != nil { - query = query.Where("node_state = ?", f.SchedulerState) - // Requires Additional time_stamp Filter: Else the last (past!) time_stamp with queried state will be returned - // TODO: Hardcoded TimeDiff Suboptimal - Use Config Option? - now := time.Now().Unix() - query = query.Where(sq.Gt{"time_stamp": (now - 300)}) - } - if f.HealthState != nil { - query = query.Where("health_state = ?", f.HealthState) - // Requires Additional time_stamp Filter: Else the last (past!) time_stamp with queried state will be returned - // TODO: Hardcoded TimeDiff Suboptimal - Use Config Option? - now := time.Now().Unix() - query = query.Where(sq.Gt{"time_stamp": (now - 300)}) - } - } + query = applyNodeFilters(query, filters) - query = query.GroupBy("time_stamp").OrderBy("time_stamp DESC").Limit(1) - - rows, err := query.RunWith(r.stmtCache).Query() - if err != nil { + var count int + if err := query.RunWith(r.stmtCache).QueryRow().Scan(&count); err != nil { queryString, queryVars, _ := query.ToSql() cclog.Errorf("Error while running query '%s' %v: %v", queryString, queryVars, err) return 0, err } - var totalNodes int - for rows.Next() { - var timestamp int - if err := rows.Scan(×tamp, &totalNodes); err != nil { - rows.Close() - cclog.Warnf("Error while scanning rows (CountNodes) at time '%d'", timestamp) - return 0, err - } - } - - return totalNodes, nil + return count, nil } func (r *NodeRepository) ListNodes(cluster string) ([]*schema.Node, error) { - q := sq.Select("node.hostname", "node.cluster", "node.subcluster", "node_state.node_state", - "node_state.health_state", "MAX(node_state.time_stamp) as time"). + q := sq.Select("node.hostname", "node.cluster", "node.subcluster", + "node_state.node_state", "node_state.health_state"). From("node"). Join("node_state ON node_state.node_id = node.id"). + Where(latestStateCondition()). Where("node.cluster = ?", cluster). - GroupBy("node_state.node_id"). OrderBy("node.hostname ASC") rows, err := q.RunWith(r.DB).Query() @@ -456,10 +427,9 @@ func (r *NodeRepository) ListNodes(cluster string) ([]*schema.Node, error) { defer rows.Close() for rows.Next() { node := &schema.Node{} - var timestamp int if err := rows.Scan(&node.Hostname, &node.Cluster, - &node.SubCluster, &node.NodeState, &node.HealthState, ×tamp); err != nil { - cclog.Warnf("Error while scanning node list (ListNodes) at time '%d'", timestamp) + &node.SubCluster, &node.NodeState, &node.HealthState); err != nil { + cclog.Warn("Error while scanning node list (ListNodes)") return nil, err } @@ -470,11 +440,11 @@ func (r *NodeRepository) ListNodes(cluster string) ([]*schema.Node, error) { } func (r *NodeRepository) MapNodes(cluster string) (map[string]string, error) { - q := sq.Select("node.hostname", "node_state.node_state", "MAX(node_state.time_stamp) as time"). + q := sq.Select("node.hostname", "node_state.node_state"). From("node"). Join("node_state ON node_state.node_id = node.id"). + Where(latestStateCondition()). Where("node.cluster = ?", cluster). - GroupBy("node_state.node_id"). OrderBy("node.hostname ASC") rows, err := q.RunWith(r.DB).Query() @@ -487,9 +457,8 @@ func (r *NodeRepository) MapNodes(cluster string) (map[string]string, error) { defer rows.Close() for rows.Next() { var hostname, nodestate string - var timestamp int - if err := rows.Scan(&hostname, &nodestate, ×tamp); err != nil { - cclog.Warnf("Error while scanning node list (MapNodes) at time '%d'", timestamp) + if err := rows.Scan(&hostname, &nodestate); err != nil { + cclog.Warn("Error while scanning node list (MapNodes)") return nil, err } @@ -500,33 +469,16 @@ func (r *NodeRepository) MapNodes(cluster string) (map[string]string, error) { } func (r *NodeRepository) CountStates(ctx context.Context, filters []*model.NodeFilter, column string) ([]*model.NodeStates, error) { - query, qerr := AccessCheck(ctx, sq.Select("hostname", column, "MAX(time_stamp) as time").From("node")) + query, qerr := AccessCheck(ctx, + sq.Select(column). + From("node"). + Join("node_state ON node_state.node_id = node.id"). + Where(latestStateCondition())) if qerr != nil { return nil, qerr } - query = query.Join("node_state ON node_state.node_id = node.id") - - for _, f := range filters { - if f.Hostname != nil { - query = buildStringCondition("hostname", f.Hostname, query) - } - if f.Cluster != nil { - query = buildStringCondition("cluster", f.Cluster, query) - } - if f.SubCluster != nil { - query = buildStringCondition("subcluster", f.SubCluster, query) - } - if f.SchedulerState != nil { - query = query.Where("node_state = ?", f.SchedulerState) - } - if f.HealthState != nil { - query = query.Where("health_state = ?", f.HealthState) - } - } - - // Add Group and Order - query = query.GroupBy("hostname").OrderBy("hostname DESC") + query = applyNodeFilters(query, filters) rows, err := query.RunWith(r.stmtCache).Query() if err != nil { @@ -537,12 +489,10 @@ func (r *NodeRepository) CountStates(ctx context.Context, filters []*model.NodeF stateMap := map[string]int{} for rows.Next() { - var hostname, state string - var timestamp int - - if err := rows.Scan(&hostname, &state, ×tamp); err != nil { + var state string + if err := rows.Scan(&state); err != nil { rows.Close() - cclog.Warnf("Error while scanning rows (CountStates) at time '%d'", timestamp) + cclog.Warn("Error while scanning rows (CountStates)") return nil, err } @@ -735,26 +685,14 @@ func (r *NodeRepository) GetNodesForList( } } else { - // DB Nodes: Count and Find Next Page + // DB Nodes: Count and derive hasNextPage from count var cerr error countNodes, cerr = r.CountNodes(ctx, queryFilters) if cerr != nil { cclog.Warn("error while counting node database data (Resolver.NodeMetricsList)") return nil, nil, 0, false, cerr } - - // Example Page 4 @ 10 IpP : Does item 41 exist? - // Minimal Page 41 @ 1 IpP : If len(result) is 1, Page 5 exists. - nextPage := &model.PageRequest{ - ItemsPerPage: 1, - Page: ((page.Page * page.ItemsPerPage) + 1), - } - nextNodes, err := r.QueryNodes(ctx, queryFilters, nextPage, nil) // Order not Used - if err != nil { - cclog.Warn("Error while querying next nodes") - return nil, nil, 0, false, err - } - hasNextPage = len(nextNodes) == 1 + hasNextPage = page.Page*page.ItemsPerPage < countNodes } // Fallback for non-init'd node table in DB; Ignores stateFilter