fix: Add defer.close for all queries

This commit is contained in:
2026-03-11 05:04:20 +01:00
parent f9aa47ea1c
commit 5d3d77620e
12 changed files with 709 additions and 35 deletions

View File

@@ -240,7 +240,6 @@ func (r *NodeRepository) QueryNodes(
page *model.PageRequest,
order *model.OrderByInput, // Currently unused!
) ([]*schema.Node, error) {
query, qerr := AccessCheck(ctx,
sq.Select("hostname", "cluster", "subcluster", "node_state", "health_state", "MAX(time_stamp) as time").
From("node").
@@ -286,6 +285,7 @@ func (r *NodeRepository) QueryNodes(
cclog.Errorf("Error while running query '%s' %v: %v", queryString, queryVars, err)
return nil, err
}
defer rows.Close()
nodes := make([]*schema.Node, 0)
for rows.Next() {
@@ -300,6 +300,89 @@ func (r *NodeRepository) QueryNodes(
nodes = append(nodes, &node)
}
if err := rows.Err(); err != nil {
return nil, err
}
return nodes, nil
}
// QueryNodesWithMeta returns a list of nodes based on a node filter. It always operates
// on the last state (largest timestamp). It includes both (!) optional JSON column data
func (r *NodeRepository) QueryNodesWithMeta(
ctx context.Context,
filters []*model.NodeFilter,
page *model.PageRequest,
order *model.OrderByInput, // Currently unused!
) ([]*schema.Node, error) {
query, qerr := AccessCheck(ctx,
sq.Select("node.hostname", "node.cluster", "node.subcluster",
"node_state.node_state", "node_state.health_state",
"node.meta_data", "node_state.health_metrics").
From("node").
Join("node_state ON node_state.node_id = node.id").
Where(latestStateCondition()))
if qerr != nil {
return nil, qerr
}
query = applyNodeFilters(query, filters)
query = query.OrderBy("node.hostname ASC")
if page != nil && page.ItemsPerPage != -1 {
limit := uint64(page.ItemsPerPage)
query = query.Offset((uint64(page.Page) - 1) * limit).Limit(limit)
}
rows, err := query.RunWith(r.stmtCache).Query()
if err != nil {
queryString, queryVars, _ := query.ToSql()
cclog.Errorf("Error while running query '%s' %v: %v", queryString, queryVars, err)
return nil, err
}
defer rows.Close()
nodes := make([]*schema.Node, 0)
for rows.Next() {
node := schema.Node{}
RawMetaData := make([]byte, 0)
RawMetricHealth := make([]byte, 0)
if err := rows.Scan(&node.Hostname, &node.Cluster, &node.SubCluster,
&node.NodeState, &node.HealthState, &RawMetaData, &RawMetricHealth); err != nil {
cclog.Warn("Error while scanning rows (QueryNodes)")
return nil, err
}
if len(RawMetaData) == 0 {
node.MetaData = nil
} else {
metaData := make(map[string]string)
if err := json.Unmarshal(RawMetaData, &metaData); err != nil {
cclog.Warn("Error while unmarshaling raw metadata json")
return nil, err
}
node.MetaData = metaData
}
if len(RawMetricHealth) == 0 {
node.HealthData = nil
} else {
healthData := make(map[string][]string)
if err := json.Unmarshal(RawMetricHealth, &healthData); err != nil {
cclog.Warn("Error while unmarshaling raw healthdata json")
return nil, err
}
node.HealthData = healthData
}
nodes = append(nodes, &node)
}
if err := rows.Err(); err != nil {
return nil, err
}
return nodes, nil
}
@@ -309,7 +392,6 @@ func (r *NodeRepository) CountNodes(
ctx context.Context,
filters []*model.NodeFilter,
) (int, error) {
query, qerr := AccessCheck(ctx,
sq.Select("time_stamp", "count(*) as countRes").
From("node").
@@ -426,7 +508,12 @@ func (r *NodeRepository) MapNodes(cluster string) (map[string]string, error) {
}
func (r *NodeRepository) CountStates(ctx context.Context, filters []*model.NodeFilter, column string) ([]*model.NodeStates, error) {
query, qerr := AccessCheck(ctx, sq.Select("hostname", column, "MAX(time_stamp) as time").From("node"))
query, qerr := AccessCheck(ctx,
sq.Select(column, "COUNT(*) as count").
From("node").
Join("node_state ON node_state.node_id = node.id").
Where(latestStateCondition()).
GroupBy(column))
if qerr != nil {
return nil, qerr
}
@@ -460,25 +547,21 @@ func (r *NodeRepository) CountStates(ctx context.Context, filters []*model.NodeF
cclog.Errorf("Error while running query '%s' %v: %v", queryString, queryVars, err)
return nil, err
}
stateMap := map[string]int{}
for rows.Next() {
var hostname, state string
var timestamp int
if err := rows.Scan(&hostname, &state, &timestamp); err != nil {
rows.Close()
cclog.Warnf("Error while scanning rows (CountStates) at time '%d'", timestamp)
return nil, err
}
stateMap[state] += 1
}
defer rows.Close()
nodes := make([]*model.NodeStates, 0)
for state, counts := range stateMap {
node := model.NodeStates{State: state, Count: counts}
nodes = append(nodes, &node)
for rows.Next() {
var state string
var count int
if err := rows.Scan(&state, &count); err != nil {
cclog.Warn("Error while scanning rows (CountStates)")
return nil, err
}
nodes = append(nodes, &model.NodeStates{State: state, Count: count})
}
if err := rows.Err(); err != nil {
return nil, err
}
return nodes, nil
@@ -524,6 +607,7 @@ func (r *NodeRepository) CountStatesTimed(ctx context.Context, filters []*model.
cclog.Errorf("Error while running query '%s' %v: %v", queryString, queryVars, err)
return nil, err
}
defer rows.Close()
rawData := make(map[string][][]int)
for rows.Next() {
@@ -531,7 +615,6 @@ func (r *NodeRepository) CountStatesTimed(ctx context.Context, filters []*model.
var timestamp, count int
if err := rows.Scan(&state, &timestamp, &count); err != nil {
rows.Close()
cclog.Warnf("Error while scanning rows (CountStatesTimed) at time '%d'", timestamp)
return nil, err
}
@@ -544,6 +627,10 @@ func (r *NodeRepository) CountStatesTimed(ctx context.Context, filters []*model.
rawData[state][1] = append(rawData[state][1], count)
}
if err := rows.Err(); err != nil {
return nil, err
}
timedStates := make([]*model.NodeStatesTimed, 0)
for state, data := range rawData {
entry := model.NodeStatesTimed{State: state, Times: data[0], Counts: data[1]}

View File

@@ -159,6 +159,7 @@ func (r *JobRepository) JobsStatsGrouped(
cclog.Warn("Error while querying DB for job statistics")
return nil, err
}
defer rows.Close()
stats := make([]*model.JobsStatistics, 0, 100)
@@ -244,6 +245,10 @@ func (r *JobRepository) JobsStatsGrouped(
}
}
if err := rows.Err(); err != nil {
return nil, err
}
cclog.Debugf("Timer JobsStatsGrouped %s", time.Since(start))
return stats, nil
}
@@ -329,6 +334,7 @@ func (r *JobRepository) JobCountGrouped(
cclog.Warn("Error while querying DB for job statistics")
return nil, err
}
defer rows.Close()
stats := make([]*model.JobsStatistics, 0, 100)
@@ -348,6 +354,10 @@ func (r *JobRepository) JobCountGrouped(
}
}
if err := rows.Err(); err != nil {
return nil, err
}
cclog.Debugf("Timer JobCountGrouped %s", time.Since(start))
return stats, nil
}
@@ -371,6 +381,7 @@ func (r *JobRepository) AddJobCountGrouped(
cclog.Warn("Error while querying DB for job statistics")
return nil, err
}
defer rows.Close()
counts := make(map[string]int)
@@ -386,6 +397,10 @@ func (r *JobRepository) AddJobCountGrouped(
}
}
if err := rows.Err(); err != nil {
return nil, err
}
switch kind {
case "running":
for _, s := range stats {
@@ -413,23 +428,13 @@ func (r *JobRepository) AddJobCount(
if err != nil {
return nil, err
}
rows, err := query.RunWith(r.DB).Query()
if err != nil {
cclog.Warn("Error while querying DB for job statistics")
var cnt sql.NullInt64
if err := query.RunWith(r.DB).QueryRow().Scan(&cnt); err != nil {
cclog.Warn("Error while querying DB for job count")
return nil, err
}
var count int
for rows.Next() {
var cnt sql.NullInt64
if err := rows.Scan(&cnt); err != nil {
cclog.Warn("Error while scanning rows")
return nil, err
}
count = int(cnt.Int64)
}
count := int(cnt.Int64)
switch kind {
case "running":
@@ -567,6 +572,7 @@ func (r *JobRepository) jobsStatisticsHistogram(
cclog.Error("Error while running query")
return nil, err
}
defer rows.Close()
points := make([]*model.HistoPoint, 0)
// is it possible to introduce zero values here? requires info about bincount
@@ -579,6 +585,11 @@ func (r *JobRepository) jobsStatisticsHistogram(
points = append(points, &point)
}
if err := rows.Err(); err != nil {
return nil, err
}
cclog.Debugf("Timer jobsStatisticsHistogram %s", time.Since(start))
return points, nil
}
@@ -614,6 +625,7 @@ func (r *JobRepository) jobsDurationStatisticsHistogram(
cclog.Error("Error while running query")
return nil, err
}
defer rows.Close()
// Fill Array at matching $Value
for rows.Next() {
@@ -634,6 +646,10 @@ func (r *JobRepository) jobsDurationStatisticsHistogram(
}
}
if err := rows.Err(); err != nil {
return nil, err
}
cclog.Debugf("Timer jobsStatisticsHistogram %s", time.Since(start))
return points, nil
}
@@ -716,6 +732,7 @@ func (r *JobRepository) jobsMetricStatisticsHistogram(
cclog.Errorf("Error while running mainQuery: %s", err)
return nil, err
}
defer rows.Close()
// Setup Return Array With Bin-Numbers for Match and Min/Max based on Peak
points := make([]*model.MetricHistoPoint, 0)
@@ -751,6 +768,10 @@ func (r *JobRepository) jobsMetricStatisticsHistogram(
}
}
if err := rows.Err(); err != nil {
return nil, err
}
result := model.MetricHistoPoints{Metric: metric, Unit: unit, Stat: &footprintStat, Data: points}
cclog.Debugf("Timer jobsStatisticsHistogram %s", time.Since(start))

View File

@@ -205,6 +205,7 @@ func (r *JobRepository) CountTags(user *schema.User) (tags []schema.Tag, counts
if err != nil {
return nil, nil, err
}
defer xrows.Close()
for xrows.Next() {
var t schema.Tag
@@ -222,6 +223,10 @@ func (r *JobRepository) CountTags(user *schema.User) (tags []schema.Tag, counts
}
}
if err := xrows.Err(); err != nil {
return nil, nil, err
}
// Query and Count Jobs with attached Tags
q := sq.Select("t.tag_name, t.id, count(jt.tag_id)").
From("tag t").
@@ -256,6 +261,7 @@ func (r *JobRepository) CountTags(user *schema.User) (tags []schema.Tag, counts
if err != nil {
return nil, nil, err
}
defer rows.Close()
counts = make(map[string]int)
for rows.Next() {
@@ -405,6 +411,7 @@ func (r *JobRepository) GetTags(user *schema.User, job *int64) ([]*schema.Tag, e
cclog.Errorf("Error get tags with %s: %v", s, err)
return nil, err
}
defer rows.Close()
tags := make([]*schema.Tag, 0)
for rows.Next() {
@@ -423,6 +430,10 @@ func (r *JobRepository) GetTags(user *schema.User, job *int64) ([]*schema.Tag, e
}
}
if err := rows.Err(); err != nil {
return nil, err
}
return tags, nil
}
@@ -438,6 +449,7 @@ func (r *JobRepository) GetTagsDirect(job *int64) ([]*schema.Tag, error) {
cclog.Errorf("Error get tags with %s: %v", s, err)
return nil, err
}
defer rows.Close()
tags := make([]*schema.Tag, 0)
for rows.Next() {
@@ -449,6 +461,10 @@ func (r *JobRepository) GetTagsDirect(job *int64) ([]*schema.Tag, error) {
tags = append(tags, tag)
}
if err := rows.Err(); err != nil {
return nil, err
}
return tags, nil
}
@@ -465,6 +481,7 @@ func (r *JobRepository) getArchiveTags(job *int64) ([]*schema.Tag, error) {
cclog.Errorf("Error get tags with %s: %v", s, err)
return nil, err
}
defer rows.Close()
tags := make([]*schema.Tag, 0)
for rows.Next() {
@@ -476,6 +493,10 @@ func (r *JobRepository) getArchiveTags(job *int64) ([]*schema.Tag, error) {
tags = append(tags, tag)
}
if err := rows.Err(); err != nil {
return nil, err
}
return tags, nil
}