Merge pull request #93 from ClusterCockpit/sql-repository-opt

Refactor SQL repository  and add migration support
This commit is contained in:
Jan Eitzinger 2023-02-21 16:23:59 +01:00 committed by GitHub
commit 33d77d6ef1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
36 changed files with 2016 additions and 512 deletions

View File

@ -30,6 +30,7 @@ SVELTE_SRC = $(wildcard $(FRONTEND)/src/*.svelte) \
$(TARGET): $(VAR) $(SVELTE_TARGETS)
$(info ===> BUILD cc-backend)
@go build -ldflags=${LD_FLAGS} ./cmd/cc-backend
./cc-backend --migrate-db
clean:
$(info ===> CLEAN)
@ -48,5 +49,4 @@ $(SVELTE_TARGETS): $(SVELTE_SRC)
$(VAR):
@mkdir $(VAR)
@touch ./var/job.db
cd web/frontend && yarn install

View File

@ -98,6 +98,15 @@ A config file in the JSON format has to be provided using `--config` to override
By default, if there is a `config.json` file in the current directory of the `cc-backend` process, it will be loaded even without the `--config` flag.
You find documentation of all supported configuration and command line options [here](./configs.README.md).
## Database initialization and migration
Every cc-backend version supports a specific database version.
On startup the version of the sqlite database is validated and cc-backend will terminate if the version does not match.
cc-backend supports to migrate the database schema up to the required version using the `--migrate-db` command line option.
In case the database file does not yet exist it is created and initialized by the `--migrate-db` command line option.
In case you want to use a newer database version with an olden version of cc-backend you can downgrade a database using the external [migrate](https://github.com/golang-migrate/migrate) tool.
In this case you have to provide the path to the migration files in a recent source tree: `./internal/repository/migrations/`.
## Development
In case the REST or GraphQL API is changed the according code generators have to be used.

View File

@ -61,7 +61,7 @@ var (
)
func main() {
var flagReinitDB, flagServer, flagSyncLDAP, flagGops, flagDev, flagVersion, flagLogDateTime bool
var flagReinitDB, flagServer, flagSyncLDAP, flagGops, flagMigrateDB, flagDev, flagVersion, flagLogDateTime bool
var flagNewUser, flagDelUser, flagGenJWT, flagConfigFile, flagImportJob, flagLogLevel string
flag.BoolVar(&flagReinitDB, "init-db", false, "Go through job-archive and re-initialize the 'job', 'tag', and 'jobtag' tables (all running jobs will be lost!)")
flag.BoolVar(&flagSyncLDAP, "sync-ldap", false, "Sync the 'user' table with ldap")
@ -69,13 +69,14 @@ func main() {
flag.BoolVar(&flagGops, "gops", false, "Listen via github.com/google/gops/agent (for debugging)")
flag.BoolVar(&flagDev, "dev", false, "Enable development components: GraphQL Playground and Swagger UI")
flag.BoolVar(&flagVersion, "version", false, "Show version information and exit")
flag.BoolVar(&flagMigrateDB, "migrate-db", false, "Migrate database to supported version and exit")
flag.BoolVar(&flagLogDateTime, "logdate", false, "Set this flag to add date and time to log messages")
flag.StringVar(&flagConfigFile, "config", "./config.json", "Specify alternative path to `config.json`")
flag.StringVar(&flagNewUser, "add-user", "", "Add a new user. Argument format: `<username>:[admin,support,api,user]:<password>`")
flag.StringVar(&flagDelUser, "del-user", "", "Remove user by `username`")
flag.StringVar(&flagGenJWT, "jwt", "", "Generate and print a JWT for the user specified by its `username`")
flag.StringVar(&flagImportJob, "import-job", "", "Import a job. Argument format: `<path-to-meta.json>:<path-to-data.json>,...`")
flag.StringVar(&flagLogLevel, "loglevel", "debug", "Sets the logging level: `[debug (default),info,notice,warn,err,fatal,crit]`")
flag.StringVar(&flagLogLevel, "loglevel", "debug", "Sets the logging level: `[debug (default),info,warn,err,fatal,crit]`")
flag.Parse()
if flagVersion {
@ -112,6 +113,11 @@ func main() {
config.Keys.DB = os.Getenv(envvar)
}
if flagMigrateDB {
repository.MigrateDB(config.Keys.DBDriver, config.Keys.DB)
os.Exit(0)
}
repository.Connect(config.Keys.DBDriver, config.Keys.DB)
db := repository.GetConnection()

View File

@ -40,15 +40,15 @@ Start by creating a base folder for all of the following steps.
- `cd ../..`
* Build Go Executable
- `go build ./cmd/cc-backend/`
* Prepare Datafolder and Database file
- `mkdir var`
- `touch var/job.db`
* Activate & Config environment for cc-backend
- `cp configs/env-template.txt .env`
- Optional: Have a look via `vim ./.env`
- Copy the `config.json` file included in this tarball into the root directory of cc-backend: `cp ../../config.json ./`
* Back to toplevel `clustercockpit`
- `cd ..`
* Prepare Datafolder and Database file
- `mkdir var`
- `./cc-backend --migrate-db`
### Setup cc-metric-store
* Clone Repository

View File

@ -1,5 +1,5 @@
{
"addr": "0.0.0.0:8080",
"addr": "127.0.0.1:8080",
"archive": {
"kind": "file",
"path": "./var/job-archive"

12
go.mod
View File

@ -1,6 +1,6 @@
module github.com/ClusterCockpit/cc-backend
go 1.17
go 1.18
require (
github.com/99designs/gqlgen v0.17.16
@ -39,10 +39,13 @@ require (
github.com/go-openapi/jsonreference v0.20.0 // indirect
github.com/go-openapi/spec v0.20.7 // indirect
github.com/go-openapi/swag v0.22.3 // indirect
github.com/golang-migrate/migrate/v4 v4.15.2 // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/gorilla/securecookie v1.1.1 // indirect
github.com/gorilla/websocket v1.5.0 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/hashicorp/golang-lru v0.5.4 // indirect
github.com/influxdata/line-protocol v0.0.0-20210922203350-b1ad95c89adf // indirect
github.com/josharian/intern v1.0.0 // indirect
@ -51,7 +54,7 @@ require (
github.com/lann/builder v0.0.0-20180802200727-47ae307949d0 // indirect
github.com/lann/ps v0.0.0-20150810152359-62de8c46ede0 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
@ -59,17 +62,20 @@ require (
github.com/pkg/errors v0.9.1 // indirect
github.com/prometheus/client_model v0.3.0 // indirect
github.com/prometheus/procfs v0.8.0 // indirect
github.com/qustavo/sqlhooks/v2 v2.1.0 // indirect
github.com/russross/blackfriday/v2 v2.1.0 // indirect
github.com/swaggo/files v0.0.0-20220728132757-551d4a08d97a // indirect
github.com/urfave/cli/v2 v2.8.1 // indirect
github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 // indirect
go.uber.org/atomic v1.7.0 // indirect
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4 // indirect
golang.org/x/net v0.0.0-20220909164309-bea034e7d591 // indirect
golang.org/x/oauth2 v0.0.0-20220223155221-ee480838109b // indirect
golang.org/x/sys v0.0.0-20220913175220-63ea55921009 // indirect
golang.org/x/text v0.3.7 // indirect
golang.org/x/tools v0.1.12 // indirect
google.golang.org/appengine v1.6.6 // indirect
golang.org/x/xerrors v0.0.0-20220411194840-2f41105eb62f // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/protobuf v1.28.1 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect

1284
go.sum

File diff suppressed because it is too large Load Diff

View File

@ -85,18 +85,6 @@ func Init(db *sqlx.DB,
configs map[string]interface{}) (*Authentication, error) {
auth := &Authentication{}
auth.db = db
_, err := db.Exec(`
CREATE TABLE IF NOT EXISTS user (
username varchar(255) PRIMARY KEY NOT NULL,
password varchar(255) DEFAULT NULL,
ldap tinyint NOT NULL DEFAULT 0, /* col called "ldap" for historic reasons, fills the "AuthSource" */
name varchar(255) DEFAULT NULL,
roles varchar(255) NOT NULL DEFAULT "[]",
email varchar(255) DEFAULT NULL);`)
if err != nil {
log.Error("Error while initializing authentication -> create user table failed")
return nil, err
}
sessKey := os.Getenv("SESSION_KEY")
if sessKey == "" {

View File

@ -44,7 +44,7 @@ func (la *LdapAuthenticator) Init(
}
if interval == 0 {
log.Note("Sync interval is zero")
log.Info("Sync interval is zero")
return nil
}

View File

@ -19,7 +19,7 @@ func TestInit(t *testing.T) {
func TestInitMinimal(t *testing.T) {
fp := "../../docs/config.json"
Init(fp)
if Keys.Addr != "0.0.0.0:8080" {
t.Errorf("wrong addr\ngot: %s \nwant: 0.0.0.0:8080", Keys.Addr)
if Keys.Addr != "127.0.0.1:8080" {
t.Errorf("wrong addr\ngot: %s \nwant: 127.0.0.1:8080", Keys.Addr)
}
}

View File

@ -238,7 +238,7 @@ func (r *queryResolver) Jobs(ctx context.Context, filter []*model.JobFilter, pag
// JobsStatistics is the resolver for the jobsStatistics field.
func (r *queryResolver) JobsStatistics(ctx context.Context, filter []*model.JobFilter, groupBy *model.Aggregate) ([]*model.JobsStatistics, error) {
return r.jobsStatistics(ctx, filter, groupBy)
return r.Repo.JobsStatistics(ctx, filter, groupBy)
}
// JobsCount is the resolver for the jobsCount field.

View File

@ -6,220 +6,16 @@ package graph
import (
"context"
"database/sql"
"errors"
"fmt"
"math"
"time"
"github.com/99designs/gqlgen/graphql"
"github.com/ClusterCockpit/cc-backend/internal/graph/model"
"github.com/ClusterCockpit/cc-backend/internal/metricdata"
"github.com/ClusterCockpit/cc-backend/internal/repository"
"github.com/ClusterCockpit/cc-backend/pkg/archive"
"github.com/ClusterCockpit/cc-backend/pkg/log"
"github.com/ClusterCockpit/cc-backend/pkg/schema"
sq "github.com/Masterminds/squirrel"
)
// GraphQL validation should make sure that no unkown values can be specified.
var groupBy2column = map[model.Aggregate]string{
model.AggregateUser: "job.user",
model.AggregateProject: "job.project",
model.AggregateCluster: "job.cluster",
}
const ShortJobDuration int = 5 * 60
// Helper function for the jobsStatistics GraphQL query placed here so that schema.resolvers.go is not too full.
func (r *queryResolver) jobsStatistics(ctx context.Context, filter []*model.JobFilter, groupBy *model.Aggregate) ([]*model.JobsStatistics, error) {
// In case `groupBy` is nil (not used), the model.JobsStatistics used is at the key '' (empty string)
stats := map[string]*model.JobsStatistics{}
// `socketsPerNode` and `coresPerSocket` can differ from cluster to cluster, so we need to explicitly loop over those.
for _, cluster := range archive.Clusters {
for _, subcluster := range cluster.SubClusters {
corehoursCol := fmt.Sprintf("CAST(ROUND(SUM(job.duration * job.num_nodes * %d * %d) / 3600) as int)", subcluster.SocketsPerNode, subcluster.CoresPerSocket)
var query sq.SelectBuilder
if groupBy == nil {
query = sq.Select(
"''",
"COUNT(job.id)",
"CAST(ROUND(SUM(job.duration) / 3600) as int)",
corehoursCol,
).From("job")
} else {
col := groupBy2column[*groupBy]
query = sq.Select(
col,
"COUNT(job.id)",
"CAST(ROUND(SUM(job.duration) / 3600) as int)",
corehoursCol,
).From("job").GroupBy(col)
}
query = query.
Where("job.cluster = ?", cluster.Name).
Where("job.subcluster = ?", subcluster.Name)
query = repository.SecurityCheck(ctx, query)
for _, f := range filter {
query = repository.BuildWhereClause(f, query)
}
rows, err := query.RunWith(r.DB).Query()
if err != nil {
log.Warn("Error while querying DB for job statistics")
return nil, err
}
for rows.Next() {
var id sql.NullString
var jobs, walltime, corehours sql.NullInt64
if err := rows.Scan(&id, &jobs, &walltime, &corehours); err != nil {
log.Warn("Error while scanning rows")
return nil, err
}
if id.Valid {
if s, ok := stats[id.String]; ok {
s.TotalJobs += int(jobs.Int64)
s.TotalWalltime += int(walltime.Int64)
s.TotalCoreHours += int(corehours.Int64)
} else {
stats[id.String] = &model.JobsStatistics{
ID: id.String,
TotalJobs: int(jobs.Int64),
TotalWalltime: int(walltime.Int64),
TotalCoreHours: int(corehours.Int64),
}
}
}
}
}
}
if groupBy == nil {
query := sq.Select("COUNT(job.id)").From("job").Where("job.duration < ?", ShortJobDuration)
query = repository.SecurityCheck(ctx, query)
for _, f := range filter {
query = repository.BuildWhereClause(f, query)
}
if err := query.RunWith(r.DB).QueryRow().Scan(&(stats[""].ShortJobs)); err != nil {
log.Warn("Error while scanning rows for short job stats")
return nil, err
}
} else {
col := groupBy2column[*groupBy]
query := sq.Select(col, "COUNT(job.id)").From("job").Where("job.duration < ?", ShortJobDuration)
query = repository.SecurityCheck(ctx, query)
for _, f := range filter {
query = repository.BuildWhereClause(f, query)
}
rows, err := query.RunWith(r.DB).Query()
if err != nil {
log.Warn("Error while querying jobs for short jobs")
return nil, err
}
for rows.Next() {
var id sql.NullString
var shortJobs sql.NullInt64
if err := rows.Scan(&id, &shortJobs); err != nil {
log.Warn("Error while scanning rows for short jobs")
return nil, err
}
if id.Valid {
stats[id.String].ShortJobs = int(shortJobs.Int64)
}
}
if col == "job.user" {
for id, _ := range stats {
emptyDash := "-"
name, _ := repository.GetJobRepository().FindNameByUser(ctx, id)
if name != "" {
stats[id].Name = &name
} else {
stats[id].Name = &emptyDash
}
}
}
}
// Calculating the histogram data is expensive, so only do it if needed.
// An explicit resolver can not be used because we need to know the filters.
histogramsNeeded := false
fields := graphql.CollectFieldsCtx(ctx, nil)
for _, col := range fields {
if col.Name == "histDuration" || col.Name == "histNumNodes" {
histogramsNeeded = true
}
}
res := make([]*model.JobsStatistics, 0, len(stats))
for _, stat := range stats {
res = append(res, stat)
id, col := "", ""
if groupBy != nil {
id = stat.ID
col = groupBy2column[*groupBy]
}
if histogramsNeeded {
var err error
value := fmt.Sprintf(`CAST(ROUND((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END) / 3600) as int) as value`, time.Now().Unix())
stat.HistDuration, err = r.jobsStatisticsHistogram(ctx, value, filter, id, col)
if err != nil {
log.Warn("Error while loading job statistics histogram: running jobs")
return nil, err
}
stat.HistNumNodes, err = r.jobsStatisticsHistogram(ctx, "job.num_nodes as value", filter, id, col)
if err != nil {
log.Warn("Error while loading job statistics histogram: num nodes")
return nil, err
}
}
}
return res, nil
}
// `value` must be the column grouped by, but renamed to "value". `id` and `col` can optionally be used
// to add a condition to the query of the kind "<col> = <id>".
func (r *queryResolver) jobsStatisticsHistogram(ctx context.Context, value string, filters []*model.JobFilter, id, col string) ([]*model.HistoPoint, error) {
query := sq.Select(value, "COUNT(job.id) AS count").From("job")
query = repository.SecurityCheck(ctx, query)
for _, f := range filters {
query = repository.BuildWhereClause(f, query)
}
if len(id) != 0 && len(col) != 0 {
query = query.Where(col+" = ?", id)
}
rows, err := query.GroupBy("value").RunWith(r.DB).Query()
if err != nil {
log.Error("Error while running query")
return nil, err
}
points := make([]*model.HistoPoint, 0)
for rows.Next() {
point := model.HistoPoint{}
if err := rows.Scan(&point.Value, &point.Count); err != nil {
log.Warn("Error while scanning rows")
return nil, err
}
points = append(points, &point)
}
return points, nil
}
const MAX_JOBS_FOR_ANALYSIS = 500
// Helper function for the rooflineHeatmap GraphQL query placed here so that schema.resolvers.go is not too full.

View File

@ -16,8 +16,8 @@ import (
"time"
"github.com/ClusterCockpit/cc-backend/pkg/archive"
"github.com/ClusterCockpit/cc-backend/pkg/schema"
"github.com/ClusterCockpit/cc-backend/pkg/log"
"github.com/ClusterCockpit/cc-backend/pkg/schema"
)
type CCMetricStoreConfig struct {
@ -283,7 +283,7 @@ func (ccms *CCMetricStore) buildQueries(
mc := archive.GetMetricConfig(job.Cluster, metric)
if mc == nil {
// return nil, fmt.Errorf("METRICDATA/CCMS > metric '%s' is not specified for cluster '%s'", metric, job.Cluster)
log.Notef("metric '%s' is not specified for cluster '%s'", metric, job.Cluster)
log.Infof("metric '%s' is not specified for cluster '%s'", metric, job.Cluster)
continue
}

View File

@ -14,8 +14,8 @@ import (
"time"
"github.com/ClusterCockpit/cc-backend/pkg/archive"
"github.com/ClusterCockpit/cc-backend/pkg/schema"
"github.com/ClusterCockpit/cc-backend/pkg/log"
"github.com/ClusterCockpit/cc-backend/pkg/schema"
influxdb2 "github.com/influxdata/influxdb-client-go/v2"
influxdb2Api "github.com/influxdata/influxdb-client-go/v2/api"
)
@ -98,10 +98,10 @@ func (idb *InfluxDBv2DataRepository) LoadData(
idb.formatTime(job.StartTime), idb.formatTime(idb.epochToTime(job.StartTimeUnix+int64(job.Duration)+int64(1))),
measurementsCond, hostsCond)
case "socket":
log.Note("Scope 'socket' requested, but not yet supported: Will return 'node' scope only. ")
log.Info("Scope 'socket' requested, but not yet supported: Will return 'node' scope only. ")
continue
case "core":
log.Note(" Scope 'core' requested, but not yet supported: Will return 'node' scope only. ")
log.Info(" Scope 'core' requested, but not yet supported: Will return 'node' scope only. ")
continue
// Get Finest Granularity only, Set NULL to 0.0
// query = fmt.Sprintf(`
@ -115,7 +115,7 @@ func (idb *InfluxDBv2DataRepository) LoadData(
// idb.formatTime(job.StartTime), idb.formatTime(idb.epochToTime(job.StartTimeUnix + int64(job.Duration) + int64(1) )),
// measurementsCond, hostsCond)
default:
log.Notef("Unknown scope '%s' requested: Will return 'node' scope.", scope)
log.Infof("Unknown scope '%s' requested: Will return 'node' scope.", scope)
continue
// return nil, errors.New("METRICDATA/INFLUXV2 > the InfluxDB metric data repository does not yet support other scopes than 'node'")
}
@ -194,7 +194,7 @@ func (idb *InfluxDBv2DataRepository) LoadData(
// hostSeries.Data = append(hostSeries.Data, schema.Float(val))
// }
default:
log.Notef("Unknown scope '%s' requested: Will return 'node' scope.", scope)
log.Infof("Unknown scope '%s' requested: Will return 'node' scope.", scope)
continue
// return nil, errors.New("the InfluxDB metric data repository does not yet support other scopes than 'node, core'")
}
@ -324,7 +324,7 @@ func (idb *InfluxDBv2DataRepository) LoadNodeData(
ctx context.Context) (map[string]map[string][]*schema.JobMetric, error) {
// TODO : Implement to be used in Analysis- und System/Node-View
log.Notef("LoadNodeData unimplemented for InfluxDBv2DataRepository, Args: cluster %s, metrics %v, nodes %v, scopes %v", cluster, metrics, nodes, scopes)
log.Infof("LoadNodeData unimplemented for InfluxDBv2DataRepository, Args: cluster %s, metrics %v, nodes %v, scopes %v", cluster, metrics, nodes, scopes)
return nil, errors.New("METRICDATA/INFLUXV2 > unimplemented for InfluxDBv2DataRepository")
}

View File

@ -5,24 +5,24 @@
package metricdata
import (
"os"
"errors"
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"strings"
"text/template"
"bytes"
"net/http"
"time"
"math"
"sort"
"net/http"
"os"
"regexp"
"sort"
"strings"
"sync"
"text/template"
"time"
"github.com/ClusterCockpit/cc-backend/pkg/archive"
"github.com/ClusterCockpit/cc-backend/pkg/schema"
"github.com/ClusterCockpit/cc-backend/pkg/log"
"github.com/ClusterCockpit/cc-backend/pkg/schema"
promapi "github.com/prometheus/client_golang/api"
promv1 "github.com/prometheus/client_golang/api/prometheus/v1"
promcfg "github.com/prometheus/common/config"
@ -60,7 +60,6 @@ func contains(s []schema.MetricScope, str schema.MetricScope) bool {
return false
}
func MinMaxMean(data []schema.Float) (float64, float64, float64) {
if len(data) == 0 {
return 0.0, 0.0, 0.0
@ -75,13 +74,16 @@ func MinMaxMean(data []schema.Float) (float64, float64, float64) {
}
sum += float64(val)
n += 1
if float64(val) > max {max = float64(val)}
if float64(val) < min {min = float64(val)}
if float64(val) > max {
max = float64(val)
}
if float64(val) < min {
min = float64(val)
}
}
return min, max, sum / n
}
// Rewritten from
// https://github.com/ermanh/trieregex/blob/master/trieregex/trieregex.py
func nodeRegex(nodes []string) string {
@ -90,7 +92,9 @@ func nodeRegex(nodes []string) string {
for _, node := range nodes {
_trie := root
for _, c := range node {
if _, ok := _trie[c]; !ok {_trie[c] = Trie{}}
if _, ok := _trie[c]; !ok {
_trie[c] = Trie{}
}
_trie = _trie[c]
}
_trie['*'] = Trie{}
@ -106,14 +110,16 @@ func nodeRegex(nodes []string) string {
}
if len(trie) == 1 {
for key, _trie := range trie {
if key == '*' { return "" }
if key == '*' {
return ""
}
return regexp.QuoteMeta(string(key)) + trieRegex(_trie, false)
}
} else {
sequences := []string{}
for key, _trie := range trie {
if key != '*' {
sequences = append(sequences, regexp.QuoteMeta(string(key)) + trieRegex(_trie, false))
sequences = append(sequences, regexp.QuoteMeta(string(key))+trieRegex(_trie, false))
}
}
sort.Slice(sequences, func(i, j int) bool {
@ -129,7 +135,7 @@ func nodeRegex(nodes []string) string {
// multiple edges, each length 1
} else if s := strings.Join(sequences, ""); len(s) == len(sequences) {
// char or numeric range
if len(s)-1 == int(s[len(s)-1]) - int(s[0]) {
if len(s)-1 == int(s[len(s)-1])-int(s[0]) {
result = fmt.Sprintf("[%c-%c]", s[0], s[len(s)-1])
// char or numeric set
} else {
@ -139,7 +145,9 @@ func nodeRegex(nodes []string) string {
} else {
result = "(?:" + strings.Join(sequences, "|") + ")"
}
if _, ok := trie['*']; ok { result += "?"}
if _, ok := trie['*']; ok {
result += "?"
}
return result
}
return ""
@ -147,9 +155,6 @@ func nodeRegex(nodes []string) string {
return trieRegex(root, true)
}
func (pdb *PrometheusDataRepository) Init(rawConfig json.RawMessage) error {
var config PrometheusDataRepositoryConfig
// parse config
@ -194,9 +199,6 @@ func (pdb *PrometheusDataRepository) Init(rawConfig json.RawMessage) error {
return nil
}
// TODO: respect scope argument
func (pdb *PrometheusDataRepository) FormatQuery(
metric string,
@ -226,19 +228,16 @@ func (pdb *PrometheusDataRepository) FormatQuery(
}
}
// Convert PromAPI row to CC schema.Series
func (pdb *PrometheusDataRepository) RowToSeries(
from time.Time,
step int64,
steps int64,
row *promm.SampleStream) (schema.Series) {
row *promm.SampleStream) schema.Series {
ts := from.Unix()
hostname := strings.TrimSuffix(string(row.Metric["exported_instance"]), pdb.suffix)
// init array of expected length with NaN
values := make([]schema.Float, steps + 1)
values := make([]schema.Float, steps+1)
for i, _ := range values {
values[i] = schema.NaN
}
@ -258,10 +257,7 @@ func (pdb *PrometheusDataRepository) RowToSeries(
Max: max,
},
}
}
}
func (pdb *PrometheusDataRepository) LoadData(
job *schema.Job,
@ -270,7 +266,7 @@ func (pdb *PrometheusDataRepository) LoadData(
ctx context.Context) (schema.JobData, error) {
// TODO respect requested scope
if len(scopes) == 0 || !contains(scopes, schema.MetricScopeNode){
if len(scopes) == 0 || !contains(scopes, schema.MetricScopeNode) {
scopes = append(scopes, schema.MetricScopeNode)
}
@ -285,7 +281,9 @@ func (pdb *PrometheusDataRepository) LoadData(
for _, scope := range scopes {
if scope != schema.MetricScopeNode {
logOnce.Do(func(){log.Notef("Scope '%s' requested, but not yet supported: Will return 'node' scope only.", scope)})
logOnce.Do(func() {
log.Infof("Scope '%s' requested, but not yet supported: Will return 'node' scope only.", scope)
})
continue
}
@ -347,9 +345,6 @@ func (pdb *PrometheusDataRepository) LoadData(
return jobData, nil
}
// TODO change implementation to precomputed/cached stats
func (pdb *PrometheusDataRepository) LoadStats(
job *schema.Job,
@ -374,9 +369,6 @@ func (pdb *PrometheusDataRepository) LoadStats(
return stats, nil
}
func (pdb *PrometheusDataRepository) LoadNodeData(
cluster string,
metrics, nodes []string,
@ -393,7 +385,9 @@ func (pdb *PrometheusDataRepository) LoadNodeData(
}
for _, scope := range scopes {
if scope != schema.MetricScopeNode {
logOnce.Do(func(){log.Notef("Note: Scope '%s' requested, but not yet supported: Will return 'node' scope only.", scope)})
logOnce.Do(func() {
log.Infof("Note: Scope '%s' requested, but not yet supported: Will return 'node' scope only.", scope)
})
continue
}
for _, metric := range metrics {

View File

@ -5,12 +5,15 @@
package repository
import (
"database/sql"
"fmt"
"log"
"sync"
"time"
"github.com/jmoiron/sqlx"
"github.com/mattn/go-sqlite3"
"github.com/qustavo/sqlhooks/v2"
)
var (
@ -20,6 +23,7 @@ var (
type DBConnection struct {
DB *sqlx.DB
Driver string
}
func Connect(driver string, db string) {
@ -28,7 +32,9 @@ func Connect(driver string, db string) {
dbConnOnce.Do(func() {
if driver == "sqlite3" {
dbHandle, err = sqlx.Open("sqlite3", fmt.Sprintf("%s?_foreign_keys=on", db))
sql.Register("sqlite3WithHooks", sqlhooks.Wrap(&sqlite3.SQLiteDriver{}, &Hooks{}))
dbHandle, err = sqlx.Open("sqlite3WithHooks", fmt.Sprintf("%s?_foreign_keys=on", db))
// dbHandle, err = sqlx.Open("sqlite3", fmt.Sprintf("%s?_foreign_keys=on", db))
if err != nil {
log.Fatal(err)
}
@ -49,7 +55,8 @@ func Connect(driver string, db string) {
log.Fatalf("unsupported database driver: %s", driver)
}
dbConnInstance = &DBConnection{DB: dbHandle}
dbConnInstance = &DBConnection{DB: dbHandle, Driver: driver}
checkDBVersion(driver, dbHandle.DB)
})
}

View File

@ -0,0 +1,28 @@
// Copyright (C) 2022 NHR@FAU, University Erlangen-Nuremberg.
// All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package repository
import (
"context"
"time"
"github.com/ClusterCockpit/cc-backend/pkg/log"
)
// Hooks satisfies the sqlhook.Hooks interface
type Hooks struct{}
// Before hook will print the query with it's args and return the context with the timestamp
func (h *Hooks) Before(ctx context.Context, query string, args ...interface{}) (context.Context, error) {
log.Infof("SQL query %s %q", query, args)
return context.WithValue(ctx, "begin", time.Now()), nil
}
// After hook will get the timestamp registered on the Before hook and print the elapsed time
func (h *Hooks) After(ctx context.Context, query string, args ...interface{}) (context.Context, error) {
begin := ctx.Value("begin").(time.Time)
log.Infof("Took: %s\n", time.Since(begin))
return ctx, nil
}

View File

@ -19,67 +19,6 @@ import (
"github.com/ClusterCockpit/cc-backend/pkg/schema"
)
// `AUTO_INCREMENT` is in a comment because of this hack:
// https://stackoverflow.com/a/41028314 (sqlite creates unique ids automatically)
const JobsDBSchema string = `
DROP TABLE IF EXISTS jobtag;
DROP TABLE IF EXISTS job;
DROP TABLE IF EXISTS tag;
CREATE TABLE job (
id INTEGER PRIMARY KEY /*!40101 AUTO_INCREMENT */,
job_id BIGINT NOT NULL,
cluster VARCHAR(255) NOT NULL,
subcluster VARCHAR(255) NOT NULL,
start_time BIGINT NOT NULL, -- Unix timestamp
user VARCHAR(255) NOT NULL,
project VARCHAR(255) NOT NULL,
` + "`partition`" + ` VARCHAR(255) NOT NULL, -- partition is a keyword in mysql -.-
array_job_id BIGINT NOT NULL,
duration INT NOT NULL DEFAULT 0,
walltime INT NOT NULL DEFAULT 0,
job_state VARCHAR(255) NOT NULL CHECK(job_state IN ('running', 'completed', 'failed', 'cancelled', 'stopped', 'timeout', 'preempted', 'out_of_memory')),
meta_data TEXT, -- JSON
resources TEXT NOT NULL, -- JSON
num_nodes INT NOT NULL,
num_hwthreads INT NOT NULL,
num_acc INT NOT NULL,
smt TINYINT NOT NULL DEFAULT 1 CHECK(smt IN (0, 1 )),
exclusive TINYINT NOT NULL DEFAULT 1 CHECK(exclusive IN (0, 1, 2)),
monitoring_status TINYINT NOT NULL DEFAULT 1 CHECK(monitoring_status IN (0, 1, 2, 3)),
mem_used_max REAL NOT NULL DEFAULT 0.0,
flops_any_avg REAL NOT NULL DEFAULT 0.0,
mem_bw_avg REAL NOT NULL DEFAULT 0.0,
load_avg REAL NOT NULL DEFAULT 0.0,
net_bw_avg REAL NOT NULL DEFAULT 0.0,
net_data_vol_total REAL NOT NULL DEFAULT 0.0,
file_bw_avg REAL NOT NULL DEFAULT 0.0,
file_data_vol_total REAL NOT NULL DEFAULT 0.0);
CREATE TABLE tag (
id INTEGER PRIMARY KEY,
tag_type VARCHAR(255) NOT NULL,
tag_name VARCHAR(255) NOT NULL,
CONSTRAINT be_unique UNIQUE (tag_type, tag_name));
CREATE TABLE jobtag (
job_id INTEGER,
tag_id INTEGER,
PRIMARY KEY (job_id, tag_id),
FOREIGN KEY (job_id) REFERENCES job (id) ON DELETE CASCADE,
FOREIGN KEY (tag_id) REFERENCES tag (id) ON DELETE CASCADE);
`
// Indexes are created after the job-archive is traversed for faster inserts.
const JobsDbIndexes string = `
CREATE INDEX job_by_user ON job (user);
CREATE INDEX job_by_starttime ON job (start_time);
CREATE INDEX job_by_job_id ON job (job_id);
CREATE INDEX job_by_state ON job (job_state);
`
const NamedJobInsert string = `INSERT INTO job (
job_id, user, project, cluster, subcluster, ` + "`partition`" + `, array_job_id, num_nodes, num_hwthreads, num_acc,
exclusive, monitoring_status, smt, job_state, start_time, duration, walltime, resources, meta_data,
@ -210,13 +149,6 @@ func InitDB() error {
starttime := time.Now()
log.Print("Building job table...")
// Basic database structure:
_, err := db.DB.Exec(JobsDBSchema)
if err != nil {
log.Error("Error while initializing basic DB structure")
return err
}
// Inserts are bundled into transactions because in sqlite,
// that speeds up inserts A LOT.
tx, err := db.DB.Beginx()
@ -346,13 +278,6 @@ func InitDB() error {
return err
}
// Create indexes after inserts so that they do not
// need to be continually updated.
if _, err := db.DB.Exec(JobsDbIndexes); err != nil {
log.Warn("Error while creating indices after inserts")
return err
}
log.Printf("A total of %d jobs have been registered in %.3f seconds.\n", i, time.Since(starttime).Seconds())
return nil
}

View File

@ -14,9 +14,11 @@ import (
"sync"
"time"
"github.com/99designs/gqlgen/graphql"
"github.com/ClusterCockpit/cc-backend/internal/auth"
"github.com/ClusterCockpit/cc-backend/internal/graph/model"
"github.com/ClusterCockpit/cc-backend/internal/metricdata"
"github.com/ClusterCockpit/cc-backend/pkg/archive"
"github.com/ClusterCockpit/cc-backend/pkg/log"
"github.com/ClusterCockpit/cc-backend/pkg/lrucache"
"github.com/ClusterCockpit/cc-backend/pkg/schema"
@ -31,6 +33,7 @@ var (
type JobRepository struct {
DB *sqlx.DB
driver string
stmtCache *sq.StmtCache
cache *lrucache.Cache
@ -45,6 +48,8 @@ func GetJobRepository() *JobRepository {
jobRepoInstance = &JobRepository{
DB: db.DB,
driver: db.Driver,
stmtCache: sq.NewStmtCache(db.DB),
cache: lrucache.New(1024 * 1024),
archiveChannel: make(chan *schema.Job, 128),
@ -91,6 +96,7 @@ func scanJob(row interface{ Scan(...interface{}) error }) (*schema.Job, error) {
}
func (r *JobRepository) FetchJobName(job *schema.Job) (*string, error) {
start := time.Now()
cachekey := fmt.Sprintf("metadata:%d", job.ID)
if cached := r.cache.Get(cachekey, nil); cached != nil {
job.MetaData = cached.(map[string]string)
@ -113,6 +119,7 @@ func (r *JobRepository) FetchJobName(job *schema.Job) (*string, error) {
}
r.cache.Put(cachekey, job.MetaData, len(job.RawMetaData), 24*time.Hour)
log.Infof("Timer FetchJobName %s", time.Since(start))
if jobName := job.MetaData["jobName"]; jobName != "" {
return &jobName, nil
@ -122,6 +129,7 @@ func (r *JobRepository) FetchJobName(job *schema.Job) (*string, error) {
}
func (r *JobRepository) FetchMetadata(job *schema.Job) (map[string]string, error) {
start := time.Now()
cachekey := fmt.Sprintf("metadata:%d", job.ID)
if cached := r.cache.Get(cachekey, nil); cached != nil {
job.MetaData = cached.(map[string]string)
@ -144,6 +152,7 @@ func (r *JobRepository) FetchMetadata(job *schema.Job) (map[string]string, error
}
r.cache.Put(cachekey, job.MetaData, len(job.RawMetaData), 24*time.Hour)
log.Infof("Timer FetchMetadata %s", time.Since(start))
return job.MetaData, nil
}
@ -192,6 +201,7 @@ func (r *JobRepository) Find(
cluster *string,
startTime *int64) (*schema.Job, error) {
start := time.Now()
q := sq.Select(jobColumns...).From("job").
Where("job.job_id = ?", *jobId)
@ -202,6 +212,7 @@ func (r *JobRepository) Find(
q = q.Where("job.start_time = ?", *startTime)
}
log.Infof("Timer Find %s", time.Since(start))
return scanJob(q.RunWith(r.stmtCache).QueryRow())
}
@ -215,6 +226,7 @@ func (r *JobRepository) FindAll(
cluster *string,
startTime *int64) ([]*schema.Job, error) {
start := time.Now()
q := sq.Select(jobColumns...).From("job").
Where("job.job_id = ?", *jobId)
@ -240,6 +252,7 @@ func (r *JobRepository) FindAll(
}
jobs = append(jobs, job)
}
log.Infof("Timer FindAll %s", time.Since(start))
return jobs, nil
}
@ -322,6 +335,7 @@ func (r *JobRepository) DeleteJobById(id int64) error {
// TODO: Use node hours instead: SELECT job.user, sum(job.num_nodes * (CASE WHEN job.job_state = "running" THEN CAST(strftime('%s', 'now') AS INTEGER) - job.start_time ELSE job.duration END)) as x FROM job GROUP BY user ORDER BY x DESC;
func (r *JobRepository) CountGroupedJobs(ctx context.Context, aggreg model.Aggregate, filters []*model.JobFilter, weight *model.Weights, limit *int) (map[string]int, error) {
start := time.Now()
if !aggreg.IsValid() {
return nil, errors.New("invalid aggregate")
}
@ -337,7 +351,7 @@ func (r *JobRepository) CountGroupedJobs(ctx context.Context, aggreg model.Aggre
count = fmt.Sprintf(`sum(job.num_nodes * (CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END)) as count`, now)
runner = r.DB
default:
log.Notef("CountGroupedJobs() Weight %v unknown.", *weight)
log.Infof("CountGroupedJobs() Weight %v unknown.", *weight)
}
}
@ -368,6 +382,7 @@ func (r *JobRepository) CountGroupedJobs(ctx context.Context, aggreg model.Aggre
counts[group] = count
}
log.Infof("Timer CountGroupedJobs %s", time.Since(start))
return counts, nil
}
@ -405,7 +420,7 @@ func (r *JobRepository) MarkArchived(
case "file_bw":
stmt = stmt.Set("file_bw_avg", stats.Avg)
default:
log.Notef("MarkArchived() Metric '%v' unknown", metric)
log.Infof("MarkArchived() Metric '%v' unknown", metric)
}
}
@ -643,6 +658,7 @@ func (r *JobRepository) FindNameByUser(ctx context.Context, searchterm string) (
}
func (r *JobRepository) FindProject(ctx context.Context, searchterm string) (project string, err error) {
user := auth.GetUser(ctx)
if user == nil || user.HasRole(auth.RoleAdmin) || user.HasRole(auth.RoleSupport) {
err := sq.Select("job.project").Distinct().From("job").
@ -654,7 +670,6 @@ func (r *JobRepository) FindProject(ctx context.Context, searchterm string) (pro
return project, nil
}
return "", ErrNotFound
} else {
log.Infof("Non-Admin User %s : Requested Query Project -> %s: Forbidden", user.Name, project)
return "", ErrForbidden
@ -663,6 +678,7 @@ func (r *JobRepository) FindProject(ctx context.Context, searchterm string) (pro
func (r *JobRepository) Partitions(cluster string) ([]string, error) {
var err error
start := time.Now()
partitions := r.cache.Get("partitions:"+cluster, func() (interface{}, time.Duration, int) {
parts := []string{}
if err = r.DB.Select(&parts, `SELECT DISTINCT job.partition FROM job WHERE job.cluster = ?;`, cluster); err != nil {
@ -674,12 +690,15 @@ func (r *JobRepository) Partitions(cluster string) ([]string, error) {
if err != nil {
return nil, err
}
log.Infof("Timer Partitions %s", time.Since(start))
return partitions.([]string), nil
}
// AllocatedNodes returns a map of all subclusters to a map of hostnames to the amount of jobs running on that host.
// Hosts with zero jobs running on them will not show up!
func (r *JobRepository) AllocatedNodes(cluster string) (map[string]map[string]int, error) {
start := time.Now()
subclusters := make(map[string]map[string]int)
rows, err := sq.Select("resources", "subcluster").From("job").
Where("job.job_state = 'running'").
@ -716,10 +735,13 @@ func (r *JobRepository) AllocatedNodes(cluster string) (map[string]map[string]in
}
}
log.Infof("Timer AllocatedNodes %s", time.Since(start))
return subclusters, nil
}
func (r *JobRepository) StopJobsExceedingWalltimeBy(seconds int) error {
start := time.Now()
res, err := sq.Update("job").
Set("monitoring_status", schema.MonitoringStatusArchivingFailed).
Set("duration", 0).
@ -740,7 +762,211 @@ func (r *JobRepository) StopJobsExceedingWalltimeBy(seconds int) error {
}
if rowsAffected > 0 {
log.Notef("%d jobs have been marked as failed due to running too long", rowsAffected)
log.Infof("%d jobs have been marked as failed due to running too long", rowsAffected)
}
log.Infof("Timer StopJobsExceedingWalltimeBy %s", time.Since(start))
return nil
}
// TODO: Move to config
const ShortJobDuration int = 5 * 60
// GraphQL validation should make sure that no unkown values can be specified.
var groupBy2column = map[model.Aggregate]string{
model.AggregateUser: "job.user",
model.AggregateProject: "job.project",
model.AggregateCluster: "job.cluster",
}
// Helper function for the jobsStatistics GraphQL query placed here so that schema.resolvers.go is not too full.
func (r *JobRepository) JobsStatistics(ctx context.Context,
filter []*model.JobFilter,
groupBy *model.Aggregate) ([]*model.JobsStatistics, error) {
start := time.Now()
// In case `groupBy` is nil (not used), the model.JobsStatistics used is at the key '' (empty string)
stats := map[string]*model.JobsStatistics{}
var castType string
if r.driver == "sqlite3" {
castType = "int"
} else if r.driver == "mysql" {
castType = "unsigned"
}
// `socketsPerNode` and `coresPerSocket` can differ from cluster to cluster, so we need to explicitly loop over those.
for _, cluster := range archive.Clusters {
for _, subcluster := range cluster.SubClusters {
corehoursCol := fmt.Sprintf("CAST(ROUND(SUM(job.duration * job.num_nodes * %d * %d) / 3600) as %s)", subcluster.SocketsPerNode, subcluster.CoresPerSocket, castType)
var query sq.SelectBuilder
if groupBy == nil {
query = sq.Select(
"''",
"COUNT(job.id)",
fmt.Sprintf("CAST(ROUND(SUM(job.duration) / 3600) as %s)", castType),
corehoursCol,
).From("job")
} else {
col := groupBy2column[*groupBy]
query = sq.Select(
col,
"COUNT(job.id)",
fmt.Sprintf("CAST(ROUND(SUM(job.duration) / 3600) as %s)", castType),
corehoursCol,
).From("job").GroupBy(col)
}
query = query.
Where("job.cluster = ?", cluster.Name).
Where("job.subcluster = ?", subcluster.Name)
query = SecurityCheck(ctx, query)
for _, f := range filter {
query = BuildWhereClause(f, query)
}
rows, err := query.RunWith(r.DB).Query()
if err != nil {
log.Warn("Error while querying DB for job statistics")
return nil, err
}
for rows.Next() {
var id sql.NullString
var jobs, walltime, corehours sql.NullInt64
if err := rows.Scan(&id, &jobs, &walltime, &corehours); err != nil {
log.Warn("Error while scanning rows")
return nil, err
}
if id.Valid {
if s, ok := stats[id.String]; ok {
s.TotalJobs += int(jobs.Int64)
s.TotalWalltime += int(walltime.Int64)
s.TotalCoreHours += int(corehours.Int64)
} else {
stats[id.String] = &model.JobsStatistics{
ID: id.String,
TotalJobs: int(jobs.Int64),
TotalWalltime: int(walltime.Int64),
TotalCoreHours: int(corehours.Int64),
}
}
}
}
}
}
if groupBy == nil {
query := sq.Select("COUNT(job.id)").From("job").Where("job.duration < ?", ShortJobDuration)
query = SecurityCheck(ctx, query)
for _, f := range filter {
query = BuildWhereClause(f, query)
}
if err := query.RunWith(r.DB).QueryRow().Scan(&(stats[""].ShortJobs)); err != nil {
log.Warn("Error while scanning rows for short job stats")
return nil, err
}
} else {
col := groupBy2column[*groupBy]
query := sq.Select(col, "COUNT(job.id)").From("job").Where("job.duration < ?", ShortJobDuration)
query = SecurityCheck(ctx, query)
for _, f := range filter {
query = BuildWhereClause(f, query)
}
rows, err := query.RunWith(r.DB).Query()
if err != nil {
log.Warn("Error while querying jobs for short jobs")
return nil, err
}
for rows.Next() {
var id sql.NullString
var shortJobs sql.NullInt64
if err := rows.Scan(&id, &shortJobs); err != nil {
log.Warn("Error while scanning rows for short jobs")
return nil, err
}
if id.Valid {
stats[id.String].ShortJobs = int(shortJobs.Int64)
}
}
}
// Calculating the histogram data is expensive, so only do it if needed.
// An explicit resolver can not be used because we need to know the filters.
histogramsNeeded := false
fields := graphql.CollectFieldsCtx(ctx, nil)
for _, col := range fields {
if col.Name == "histDuration" || col.Name == "histNumNodes" {
histogramsNeeded = true
}
}
res := make([]*model.JobsStatistics, 0, len(stats))
for _, stat := range stats {
res = append(res, stat)
id, col := "", ""
if groupBy != nil {
id = stat.ID
col = groupBy2column[*groupBy]
}
if histogramsNeeded {
var err error
value := fmt.Sprintf(`CAST(ROUND((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END) / 3600) as %s) as value`, time.Now().Unix(), castType)
stat.HistDuration, err = r.jobsStatisticsHistogram(ctx, value, filter, id, col)
if err != nil {
log.Warn("Error while loading job statistics histogram: running jobs")
return nil, err
}
stat.HistNumNodes, err = r.jobsStatisticsHistogram(ctx, "job.num_nodes as value", filter, id, col)
if err != nil {
log.Warn("Error while loading job statistics histogram: num nodes")
return nil, err
}
}
}
log.Infof("Timer JobStatistics %s", time.Since(start))
return res, nil
}
// `value` must be the column grouped by, but renamed to "value". `id` and `col` can optionally be used
// to add a condition to the query of the kind "<col> = <id>".
func (r *JobRepository) jobsStatisticsHistogram(ctx context.Context,
value string, filters []*model.JobFilter, id, col string) ([]*model.HistoPoint, error) {
start := time.Now()
query := sq.Select(value, "COUNT(job.id) AS count").From("job")
query = SecurityCheck(ctx, query)
for _, f := range filters {
query = BuildWhereClause(f, query)
}
if len(id) != 0 && len(col) != 0 {
query = query.Where(col+" = ?", id)
}
rows, err := query.GroupBy("value").RunWith(r.DB).Query()
if err != nil {
log.Error("Error while running query")
return nil, err
}
points := make([]*model.HistoPoint, 0)
for rows.Next() {
point := model.HistoPoint{}
if err := rows.Scan(&point.Value, &point.Count); err != nil {
log.Warn("Error while scanning rows")
return nil, err
}
points = append(points, &point)
}
log.Infof("Timer jobsStatisticsHistogram %s", time.Since(start))
return points, nil
}

View File

@ -8,10 +8,12 @@ import (
"fmt"
"testing"
"github.com/ClusterCockpit/cc-backend/pkg/log"
_ "github.com/mattn/go-sqlite3"
)
func init() {
log.Init("info", true)
Connect("sqlite3", "../../test/test.db")
}

View File

@ -0,0 +1,109 @@
// Copyright (C) 2022 NHR@FAU, University Erlangen-Nuremberg.
// All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package repository
import (
"database/sql"
"embed"
"fmt"
"os"
"github.com/ClusterCockpit/cc-backend/pkg/log"
"github.com/golang-migrate/migrate/v4"
"github.com/golang-migrate/migrate/v4/database/mysql"
"github.com/golang-migrate/migrate/v4/database/sqlite3"
"github.com/golang-migrate/migrate/v4/source/iofs"
)
const supportedVersion uint = 2
//go:embed migrations/*
var migrationFiles embed.FS
func checkDBVersion(backend string, db *sql.DB) {
var m *migrate.Migrate
if backend == "sqlite3" {
driver, err := sqlite3.WithInstance(db, &sqlite3.Config{})
if err != nil {
log.Fatal(err)
}
d, err := iofs.New(migrationFiles, "migrations/sqlite3")
if err != nil {
log.Fatal(err)
}
m, err = migrate.NewWithInstance("iofs", d, "sqlite3", driver)
if err != nil {
log.Fatal(err)
}
} else if backend == "mysql" {
driver, err := mysql.WithInstance(db, &mysql.Config{})
if err != nil {
log.Fatal(err)
}
d, err := iofs.New(migrationFiles, "migrations/mysql")
if err != nil {
log.Fatal(err)
}
m, err = migrate.NewWithInstance("iofs", d, "mysql", driver)
if err != nil {
log.Fatal(err)
}
}
v, _, err := m.Version()
if err != nil {
if err == migrate.ErrNilVersion {
log.Warn("Legacy database without version or missing database file!")
} else {
log.Fatal(err)
}
}
if v < supportedVersion {
log.Warnf("Unsupported database version %d, need %d.\nPlease backup your database file and run cc-backend --migrate-db", v, supportedVersion)
os.Exit(0)
}
if v > supportedVersion {
log.Warnf("Unsupported database version %d, need %d.\nPlease refer to documentation how to downgrade db with external migrate tool!", v, supportedVersion)
os.Exit(0)
}
}
func MigrateDB(backend string, db string) {
var m *migrate.Migrate
if backend == "sqlite3" {
d, err := iofs.New(migrationFiles, "migrations/sqlite3")
if err != nil {
log.Fatal(err)
}
m, err = migrate.NewWithSourceInstance("iofs", d, fmt.Sprintf("sqlite3://%s?_foreign_keys=on", db))
if err != nil {
log.Fatal(err)
}
} else if backend == "mysql" {
d, err := iofs.New(migrationFiles, "migrations/mysql")
if err != nil {
log.Fatal(err)
}
m, err = migrate.NewWithSourceInstance("iofs", d, fmt.Sprintf("mysql://%s?multiStatements=true", db))
if err != nil {
log.Fatal(err)
}
}
if err := m.Up(); err != nil {
log.Fatal(err)
}
m.Close()
}

View File

@ -0,0 +1,5 @@
DROP TABLE IF EXISTS job;
DROP TABLE IF EXISTS tags;
DROP TABLE IF EXISTS jobtag;
DROP TABLE IF EXISTS configuration;
DROP TABLE IF EXISTS user;

View File

@ -0,0 +1,62 @@
CREATE TABLE IF NOT EXISTS job (
id INTEGER AUTO_INCREMENT PRIMARY KEY ,
job_id BIGINT NOT NULL,
cluster VARCHAR(255) NOT NULL,
subcluster VARCHAR(255) NOT NULL,
start_time BIGINT NOT NULL, -- Unix timestamp
user VARCHAR(255) NOT NULL,
project VARCHAR(255) NOT NULL,
`partition` VARCHAR(255) NOT NULL,
array_job_id BIGINT NOT NULL,
duration INT NOT NULL DEFAULT 0,
walltime INT NOT NULL DEFAULT 0,
job_state VARCHAR(255) NOT NULL
CHECK(job_state IN ('running', 'completed', 'failed', 'cancelled',
'stopped', 'timeout', 'preempted', 'out_of_memory')),
meta_data TEXT, -- JSON
resources TEXT NOT NULL, -- JSON
num_nodes INT NOT NULL,
num_hwthreads INT NOT NULL,
num_acc INT NOT NULL,
smt TINYINT NOT NULL DEFAULT 1 CHECK(smt IN (0, 1 )),
exclusive TINYINT NOT NULL DEFAULT 1 CHECK(exclusive IN (0, 1, 2)),
monitoring_status TINYINT NOT NULL DEFAULT 1 CHECK(monitoring_status IN (0, 1, 2, 3)),
mem_used_max REAL NOT NULL DEFAULT 0.0,
flops_any_avg REAL NOT NULL DEFAULT 0.0,
mem_bw_avg REAL NOT NULL DEFAULT 0.0,
load_avg REAL NOT NULL DEFAULT 0.0,
net_bw_avg REAL NOT NULL DEFAULT 0.0,
net_data_vol_total REAL NOT NULL DEFAULT 0.0,
file_bw_avg REAL NOT NULL DEFAULT 0.0,
file_data_vol_total REAL NOT NULL DEFAULT 0.0);
CREATE TABLE IF NOT EXISTS tag (
id INTEGER PRIMARY KEY,
tag_type VARCHAR(255) NOT NULL,
tag_name VARCHAR(255) NOT NULL,
CONSTRAINT be_unique UNIQUE (tag_type, tag_name));
CREATE TABLE IF NOT EXISTS jobtag (
job_id INTEGER,
tag_id INTEGER,
PRIMARY KEY (job_id, tag_id),
FOREIGN KEY (job_id) REFERENCES job (id) ON DELETE CASCADE,
FOREIGN KEY (tag_id) REFERENCES tag (id) ON DELETE CASCADE);
CREATE TABLE IF NOT EXISTS configuration (
username varchar(255),
confkey varchar(255),
value varchar(255),
PRIMARY KEY (username, confkey),
FOREIGN KEY (username) REFERENCES user (username) ON DELETE CASCADE ON UPDATE NO ACTION);
CREATE TABLE IF NOT EXISTS user (
username varchar(255) PRIMARY KEY NOT NULL,
password varchar(255) DEFAULT NULL,
ldap tinyint NOT NULL DEFAULT 0, /* col called "ldap" for historic reasons, fills the "AuthSource" */
name varchar(255) DEFAULT NULL,
roles varchar(255) NOT NULL DEFAULT "[]",
email varchar(255) DEFAULT NULL);

View File

@ -0,0 +1,5 @@
DROP INDEX IF EXISTS job_stats;
DROP INDEX IF EXISTS job_by_user;
DROP INDEX IF EXISTS job_by_starttime;
DROP INDEX IF EXISTS job_by_job_id;
DROP INDEX IF EXISTS job_by_state;

View File

@ -0,0 +1,5 @@
CREATE INDEX IF NOT EXISTS job_stats ON job (cluster,subcluster,user);
CREATE INDEX IF NOT EXISTS job_by_user ON job (user);
CREATE INDEX IF NOT EXISTS job_by_starttime ON job (start_time);
CREATE INDEX IF NOT EXISTS job_by_job_id ON job (job_id);
CREATE INDEX IF NOT EXISTS job_by_state ON job (job_state);

View File

@ -0,0 +1,5 @@
DROP TABLE IF EXISTS job;
DROP TABLE IF EXISTS tags;
DROP TABLE IF EXISTS jobtag;
DROP TABLE IF EXISTS configuration;
DROP TABLE IF EXISTS user;

View File

@ -0,0 +1,62 @@
CREATE TABLE IF NOT EXISTS job (
id INTEGER PRIMARY KEY,
job_id BIGINT NOT NULL,
cluster VARCHAR(255) NOT NULL,
subcluster VARCHAR(255) NOT NULL,
start_time BIGINT NOT NULL, -- Unix timestamp
user VARCHAR(255) NOT NULL,
project VARCHAR(255) NOT NULL,
partition VARCHAR(255) NOT NULL,
array_job_id BIGINT NOT NULL,
duration INT NOT NULL DEFAULT 0,
walltime INT NOT NULL DEFAULT 0,
job_state VARCHAR(255) NOT NULL
CHECK(job_state IN ('running', 'completed', 'failed', 'cancelled',
'stopped', 'timeout', 'preempted', 'out_of_memory')),
meta_data TEXT, -- JSON
resources TEXT NOT NULL, -- JSON
num_nodes INT NOT NULL,
num_hwthreads INT NOT NULL,
num_acc INT NOT NULL,
smt TINYINT NOT NULL DEFAULT 1 CHECK(smt IN (0, 1 )),
exclusive TINYINT NOT NULL DEFAULT 1 CHECK(exclusive IN (0, 1, 2)),
monitoring_status TINYINT NOT NULL DEFAULT 1 CHECK(monitoring_status IN (0, 1, 2, 3)),
mem_used_max REAL NOT NULL DEFAULT 0.0,
flops_any_avg REAL NOT NULL DEFAULT 0.0,
mem_bw_avg REAL NOT NULL DEFAULT 0.0,
load_avg REAL NOT NULL DEFAULT 0.0,
net_bw_avg REAL NOT NULL DEFAULT 0.0,
net_data_vol_total REAL NOT NULL DEFAULT 0.0,
file_bw_avg REAL NOT NULL DEFAULT 0.0,
file_data_vol_total REAL NOT NULL DEFAULT 0.0);
CREATE TABLE IF NOT EXISTS tag (
id INTEGER PRIMARY KEY,
tag_type VARCHAR(255) NOT NULL,
tag_name VARCHAR(255) NOT NULL,
CONSTRAINT be_unique UNIQUE (tag_type, tag_name));
CREATE TABLE IF NOT EXISTS jobtag (
job_id INTEGER,
tag_id INTEGER,
PRIMARY KEY (job_id, tag_id),
FOREIGN KEY (job_id) REFERENCES job (id) ON DELETE CASCADE,
FOREIGN KEY (tag_id) REFERENCES tag (id) ON DELETE CASCADE);
CREATE TABLE IF NOT EXISTS configuration (
username varchar(255),
confkey varchar(255),
value varchar(255),
PRIMARY KEY (username, confkey),
FOREIGN KEY (username) REFERENCES user (username) ON DELETE CASCADE ON UPDATE NO ACTION);
CREATE TABLE IF NOT EXISTS user (
username varchar(255) PRIMARY KEY NOT NULL,
password varchar(255) DEFAULT NULL,
ldap tinyint NOT NULL DEFAULT 0, /* col called "ldap" for historic reasons, fills the "AuthSource" */
name varchar(255) DEFAULT NULL,
roles varchar(255) NOT NULL DEFAULT "[]",
email varchar(255) DEFAULT NULL);

View File

@ -0,0 +1,5 @@
DROP INDEX IF EXISTS job_stats;
DROP INDEX IF EXISTS job_by_user;
DROP INDEX IF EXISTS job_by_starttime;
DROP INDEX IF EXISTS job_by_job_id;
DROP INDEX IF EXISTS job_by_state;

View File

@ -0,0 +1,5 @@
CREATE INDEX IF NOT EXISTS job_stats ON job (cluster,subcluster,user);
CREATE INDEX IF NOT EXISTS job_by_user ON job (user);
CREATE INDEX IF NOT EXISTS job_by_starttime ON job (start_time);
CREATE INDEX IF NOT EXISTS job_by_job_id ON job (job_id);
CREATE INDEX IF NOT EXISTS job_by_state ON job (job_state);

View File

@ -33,18 +33,6 @@ func GetUserCfgRepo() *UserCfgRepo {
userCfgRepoOnce.Do(func() {
db := GetConnection()
_, err := db.DB.Exec(`
CREATE TABLE IF NOT EXISTS configuration (
username varchar(255),
confkey varchar(255),
value varchar(255),
PRIMARY KEY (username, confkey),
FOREIGN KEY (username) REFERENCES user (username) ON DELETE CASCADE ON UPDATE NO ACTION);`)
if err != nil {
log.Fatalf("db.DB.exec() error: %v", err)
}
lookupConfigStmt, err := db.DB.Preparex(`SELECT confkey, value FROM configuration WHERE configuration.username = ?`)
if err != nil {
log.Fatalf("db.DB.Preparex() error: %v", err)

View File

@ -14,7 +14,6 @@ import (
"github.com/ClusterCockpit/cc-backend/internal/api"
"github.com/ClusterCockpit/cc-backend/internal/auth"
"github.com/ClusterCockpit/cc-backend/internal/graph"
"github.com/ClusterCockpit/cc-backend/internal/graph/model"
"github.com/ClusterCockpit/cc-backend/internal/repository"
"github.com/ClusterCockpit/cc-backend/pkg/archive"
@ -73,7 +72,7 @@ func setupHomeRoute(i InfoType, r *http.Request) InfoType {
from := time.Now().Add(-24 * time.Hour)
recentShortJobs, err := jobRepo.CountGroupedJobs(r.Context(), model.AggregateCluster, []*model.JobFilter{{
StartTime: &schema.TimeRange{From: &from, To: nil},
Duration: &schema.IntRange{From: 0, To: graph.ShortJobDuration},
Duration: &schema.IntRange{From: 0, To: repository.ShortJobDuration},
}}, nil, nil)
if err != nil {
log.Warnf("failed to count jobs: %s", err.Error())

View File

@ -10,9 +10,14 @@ import (
"testing"
"time"
"github.com/ClusterCockpit/cc-backend/pkg/log"
"github.com/ClusterCockpit/cc-backend/pkg/schema"
)
func init() {
log.Init("info", true)
}
func TestInitEmptyPath(t *testing.T) {
var fsa FsArchive
err := fsa.Init(json.RawMessage("{\"kind\":\"../../test/archive\"}"))

View File

@ -19,7 +19,6 @@ import (
var (
DebugWriter io.Writer = os.Stderr
NoteWriter io.Writer = os.Stderr
InfoWriter io.Writer = os.Stderr
WarnWriter io.Writer = os.Stderr
ErrWriter io.Writer = os.Stderr
@ -29,19 +28,17 @@ var (
var (
DebugPrefix string = "<7>[DEBUG] "
InfoPrefix string = "<6>[INFO] "
NotePrefix string = "<5>[NOTICE] "
WarnPrefix string = "<4>[WARNING] "
ErrPrefix string = "<3>[ERROR] "
CritPrefix string = "<2>[CRITICAL] "
)
var (
DebugLog *log.Logger = log.New(DebugWriter, DebugPrefix, 0)
InfoLog *log.Logger = log.New(InfoWriter, InfoPrefix, 0)
NoteLog *log.Logger = log.New(NoteWriter, NotePrefix, log.Lshortfile)
WarnLog *log.Logger = log.New(WarnWriter, WarnPrefix, log.Lshortfile)
ErrLog *log.Logger = log.New(ErrWriter, ErrPrefix, log.Llongfile)
CritLog *log.Logger = log.New(CritWriter, CritPrefix, log.Llongfile)
DebugLog *log.Logger
InfoLog *log.Logger
WarnLog *log.Logger
ErrLog *log.Logger
CritLog *log.Logger
)
/* CONFIG */
@ -57,9 +54,6 @@ func Init(lvl string, logdate bool) {
case "warn":
InfoWriter = io.Discard
fallthrough
case "notice":
NoteWriter = io.Discard
fallthrough
case "info":
DebugWriter = io.Discard
case "debug":
@ -72,15 +66,13 @@ func Init(lvl string, logdate bool) {
if !logdate {
DebugLog = log.New(DebugWriter, DebugPrefix, 0)
InfoLog = log.New(InfoWriter, InfoPrefix, 0)
NoteLog = log.New(NoteWriter, NotePrefix, log.Lshortfile)
InfoLog = log.New(InfoWriter, InfoPrefix, log.Lshortfile)
WarnLog = log.New(WarnWriter, WarnPrefix, log.Lshortfile)
ErrLog = log.New(ErrWriter, ErrPrefix, log.Llongfile)
CritLog = log.New(CritWriter, CritPrefix, log.Llongfile)
} else {
DebugLog = log.New(DebugWriter, DebugPrefix, log.LstdFlags)
InfoLog = log.New(InfoWriter, InfoPrefix, log.LstdFlags)
NoteLog = log.New(NoteWriter, NotePrefix, log.LstdFlags|log.Lshortfile)
InfoLog = log.New(InfoWriter, InfoPrefix, log.LstdFlags|log.Lshortfile)
WarnLog = log.New(WarnWriter, WarnPrefix, log.LstdFlags|log.Lshortfile)
ErrLog = log.New(ErrWriter, ErrPrefix, log.LstdFlags|log.Llongfile)
CritLog = log.New(CritWriter, CritPrefix, log.LstdFlags|log.Llongfile)
@ -108,10 +100,6 @@ func Info(v ...interface{}) {
InfoLog.Output(2, printStr(v...))
}
func Note(v ...interface{}) {
NoteLog.Output(2, printStr(v...))
}
func Warn(v ...interface{}) {
WarnLog.Output(2, printStr(v...))
}
@ -157,10 +145,6 @@ func Infof(format string, v ...interface{}) {
InfoLog.Output(2, printfStr(format, v...))
}
func Notef(format string, v ...interface{}) {
NoteLog.Output(2, printfStr(format, v...))
}
func Warnf(format string, v ...interface{}) {
WarnLog.Output(2, printfStr(format, v...))
}

View File

@ -11,7 +11,6 @@ else
tar xJf job-archive-dev.tar.xz
rm ./job-archive-dev.tar.xz
touch ./job.db
cd ../web/frontend
yarn install
yarn build
@ -21,5 +20,6 @@ else
cp ./docs/config.json config.json
go build ./cmd/cc-backend
./cc-backend --migrate-db
./cc-backend --server --dev --init-db --add-user demo:admin:AdminDev
fi

View File

@ -20,6 +20,7 @@ import (
"github.com/ClusterCockpit/cc-backend/internal/metricdata"
"github.com/ClusterCockpit/cc-backend/internal/repository"
"github.com/ClusterCockpit/cc-backend/pkg/archive"
"github.com/ClusterCockpit/cc-backend/pkg/log"
"github.com/ClusterCockpit/cc-backend/pkg/schema"
"github.com/gorilla/mux"
@ -245,6 +246,7 @@ func setup(t *testing.T) *api.RestApi {
]
}`
log.Init("info", true)
tmpdir := t.TempDir()
jobarchive := filepath.Join(tmpdir, "job-archive")
if err := os.Mkdir(jobarchive, 0777); err != nil {
@ -267,11 +269,7 @@ func setup(t *testing.T) *api.RestApi {
t.Fatal(err)
}
dbfilepath := filepath.Join(tmpdir, "test.db")
f, err := os.Create(dbfilepath)
if err != nil {
t.Fatal(err)
}
f.Close()
repository.MigrateDB("sqlite3", dbfilepath)
cfgFilePath := filepath.Join(tmpdir, "config.json")
if err := os.WriteFile(cfgFilePath, []byte(testconfig), 0666); err != nil {
@ -292,10 +290,6 @@ func setup(t *testing.T) *api.RestApi {
t.Fatal(err)
}
if _, err := db.DB.Exec(repository.JobsDBSchema); err != nil {
t.Fatal(err)
}
jobRepo := repository.GetJobRepository()
resolver := &graph.Resolver{DB: db.DB, Repo: jobRepo}

Binary file not shown.