diff --git a/.gitmodules b/.gitmodules index 4a8b121..11b3928 100644 --- a/.gitmodules +++ b/.gitmodules @@ -1,5 +1,5 @@ [submodule "frontend"] - path = frontend + path = web/frontend url = git@github.com:ClusterCockpit/cc-frontend.git branch = main update = merge diff --git a/graph/schema.graphqls b/api/schema.graphqls similarity index 100% rename from graph/schema.graphqls rename to api/schema.graphqls diff --git a/server.go b/cmd/cc-backend/main.go similarity index 89% rename from server.go rename to cmd/cc-backend/main.go index 5138a1b..f03d778 100644 --- a/server.go +++ b/cmd/cc-backend/main.go @@ -22,26 +22,25 @@ import ( "github.com/99designs/gqlgen/graphql/handler" "github.com/99designs/gqlgen/graphql/playground" - "github.com/ClusterCockpit/cc-backend/api" - "github.com/ClusterCockpit/cc-backend/auth" - "github.com/ClusterCockpit/cc-backend/config" - "github.com/ClusterCockpit/cc-backend/graph" - "github.com/ClusterCockpit/cc-backend/graph/generated" - "github.com/ClusterCockpit/cc-backend/log" - "github.com/ClusterCockpit/cc-backend/metricdata" - "github.com/ClusterCockpit/cc-backend/repository" - "github.com/ClusterCockpit/cc-backend/templates" + "github.com/ClusterCockpit/cc-backend/internal/api" + "github.com/ClusterCockpit/cc-backend/internal/auth" + "github.com/ClusterCockpit/cc-backend/internal/config" + "github.com/ClusterCockpit/cc-backend/internal/graph" + "github.com/ClusterCockpit/cc-backend/internal/graph/generated" + "github.com/ClusterCockpit/cc-backend/internal/metricdata" + "github.com/ClusterCockpit/cc-backend/internal/repository" + "github.com/ClusterCockpit/cc-backend/internal/routerConfig" + "github.com/ClusterCockpit/cc-backend/internal/runtimeEnv" + "github.com/ClusterCockpit/cc-backend/internal/templates" + "github.com/ClusterCockpit/cc-backend/pkg/log" "github.com/google/gops/agent" "github.com/gorilla/handlers" "github.com/gorilla/mux" - "github.com/jmoiron/sqlx" _ "github.com/go-sql-driver/mysql" _ "github.com/mattn/go-sqlite3" ) -var jobRepo *repository.JobRepository - // Format of the configurartion (file). See below for the defaults. type ProgramConfig struct { // Address where the http (or https) server will listen on (for example: 'localhost:80'). @@ -152,7 +151,7 @@ func main() { } } - if err := loadEnv("./.env"); err != nil && !os.IsNotExist(err) { + if err := runtimeEnv.LoadEnv("./.env"); err != nil && !os.IsNotExist(err) { log.Fatalf("parsing './.env' file failed: %s", err.Error()) } @@ -178,28 +177,8 @@ func main() { } var err error - var db *sqlx.DB - if programConfig.DBDriver == "sqlite3" { - db, err = sqlx.Open("sqlite3", fmt.Sprintf("%s?_foreign_keys=on", programConfig.DB)) - if err != nil { - log.Fatal(err) - } - - // sqlite does not multithread. Having more than one connection open would just mean - // waiting for locks. - db.SetMaxOpenConns(1) - } else if programConfig.DBDriver == "mysql" { - db, err = sqlx.Open("mysql", fmt.Sprintf("%s?multiStatements=true", programConfig.DB)) - if err != nil { - log.Fatal(err) - } - - db.SetConnMaxLifetime(time.Minute * 3) - db.SetMaxOpenConns(10) - db.SetMaxIdleConns(10) - } else { - log.Fatalf("unsupported database driver: %s", programConfig.DBDriver) - } + repository.Connect(programConfig.DBDriver, programConfig.DB) + db := repository.GetConnection() // Initialize sub-modules and handle all command line flags. // The order here is important! For example, the metricdata package @@ -215,7 +194,7 @@ func main() { authentication.JwtMaxAge = d } - if err := authentication.Init(db, programConfig.LdapConfig); err != nil { + if err := authentication.Init(db.DB, programConfig.LdapConfig); err != nil { log.Fatal(err) } @@ -257,7 +236,7 @@ func main() { log.Fatal("arguments --add-user and --del-user can only be used if authentication is enabled") } - if err := config.Init(db, !programConfig.DisableAuthentication, programConfig.UiDefaults, programConfig.JobArchive); err != nil { + if err := config.Init(db.DB, !programConfig.DisableAuthentication, programConfig.UiDefaults, programConfig.JobArchive); err != nil { log.Fatal(err) } @@ -266,15 +245,12 @@ func main() { } if flagReinitDB { - if err := repository.InitDB(db, programConfig.JobArchive); err != nil { + if err := repository.InitDB(db.DB, programConfig.JobArchive); err != nil { log.Fatal(err) } } - jobRepo = &repository.JobRepository{DB: db} - if err := jobRepo.Init(); err != nil { - log.Fatal(err) - } + jobRepo := repository.GetRepository() if flagImportJob != "" { if err := jobRepo.HandleImportFlag(flagImportJob); err != nil { @@ -288,7 +264,7 @@ func main() { // Setup the http.Handler/Router used by the server - resolver := &graph.Resolver{DB: db, Repo: jobRepo} + resolver := &graph.Resolver{DB: db.DB, Repo: jobRepo} graphQLEndpoint := handler.NewDefaultServer(generated.NewExecutableSchema(generated.Config{Resolvers: resolver})) if os.Getenv("DEBUG") != "1" { // Having this handler means that a error message is returned via GraphQL instead of the connection simply beeing closed. @@ -394,7 +370,7 @@ func main() { }) // Mount all /monitoring/... and /api/... routes. - setupRoutes(secured, routes) + routerConfig.SetupRoutes(secured) api.MountRoutes(secured) r.PathPrefix("/").Handler(http.FileServer(http.Dir(programConfig.StaticFiles))) @@ -461,7 +437,7 @@ func main() { // Because this program will want to bind to a privileged port (like 80), the listener must // be established first, then the user can be changed, and after that, // the actuall http server can be started. - if err := dropPrivileges(); err != nil { + if err := runtimeEnv.DropPrivileges(programConfig.Group, programConfig.User); err != nil { log.Fatalf("error while changing user: %s", err.Error()) } @@ -479,7 +455,7 @@ func main() { go func() { defer wg.Done() <-sigs - systemdNotifiy(false, "shutting down") + runtimeEnv.SystemdNotifiy(false, "shutting down") // First shut down the server gracefully (waiting for all ongoing requests) server.Shutdown(context.Background()) @@ -503,7 +479,7 @@ func main() { if os.Getenv("GOGC") == "" { debug.SetGCPercent(25) } - systemdNotifiy(true, "running") + runtimeEnv.SystemdNotifiy(true, "running") wg.Wait() log.Print("Gracefull shutdown completed!") } diff --git a/frontend b/frontend index 94ef11a..4d698c5 160000 --- a/frontend +++ b/frontend @@ -1 +1 @@ -Subproject commit 94ef11aa9fc3c194f1df497e3e06c60a7125883d +Subproject commit 4d698c519a56dd411dde7001beb0b73eb60157b9 diff --git a/gqlgen.yml b/gqlgen.yml index f02ce61..830edd2 100644 --- a/gqlgen.yml +++ b/gqlgen.yml @@ -1,10 +1,10 @@ # Where are all the schema files located? globs are supported eg src/**/*.graphqls schema: - - graph/*.graphqls + - api/*.graphqls # Where should the generated server code go? exec: - filename: graph/generated/generated.go + filename: internal/graph/generated/generated.go package: generated # Uncomment to enable federation @@ -14,7 +14,7 @@ exec: # Where should any generated models go? model: - filename: graph/model/models_gen.go + filename: internal/graph/model/models_gen.go package: model # Where should the resolver implementations go? @@ -75,5 +75,3 @@ models: Series: { model: "github.com/ClusterCockpit/cc-backend/schema.Series" } MetricStatistics: { model: "github.com/ClusterCockpit/cc-backend/schema.MetricStatistics" } StatsSeries: { model: "github.com/ClusterCockpit/cc-backend/schema.StatsSeries" } - - diff --git a/api/rest.go b/internal/api/rest.go similarity index 97% rename from api/rest.go rename to internal/api/rest.go index 83a71f3..b561b18 100644 --- a/api/rest.go +++ b/internal/api/rest.go @@ -16,14 +16,14 @@ import ( "sync" "time" - "github.com/ClusterCockpit/cc-backend/auth" - "github.com/ClusterCockpit/cc-backend/config" - "github.com/ClusterCockpit/cc-backend/graph" - "github.com/ClusterCockpit/cc-backend/graph/model" - "github.com/ClusterCockpit/cc-backend/log" - "github.com/ClusterCockpit/cc-backend/metricdata" - "github.com/ClusterCockpit/cc-backend/repository" - "github.com/ClusterCockpit/cc-backend/schema" + "github.com/ClusterCockpit/cc-backend/internal/auth" + "github.com/ClusterCockpit/cc-backend/internal/config" + "github.com/ClusterCockpit/cc-backend/internal/graph" + "github.com/ClusterCockpit/cc-backend/internal/graph/model" + "github.com/ClusterCockpit/cc-backend/internal/metricdata" + "github.com/ClusterCockpit/cc-backend/internal/repository" + "github.com/ClusterCockpit/cc-backend/pkg/log" + "github.com/ClusterCockpit/cc-backend/pkg/schema" "github.com/gorilla/mux" ) diff --git a/auth/auth.go b/internal/auth/auth.go similarity index 99% rename from auth/auth.go rename to internal/auth/auth.go index 0a99976..3fa6f1d 100644 --- a/auth/auth.go +++ b/internal/auth/auth.go @@ -14,8 +14,8 @@ import ( "strings" "time" - "github.com/ClusterCockpit/cc-backend/graph/model" - "github.com/ClusterCockpit/cc-backend/log" + "github.com/ClusterCockpit/cc-backend/internal/graph/model" + "github.com/ClusterCockpit/cc-backend/pkg/log" sq "github.com/Masterminds/squirrel" "github.com/golang-jwt/jwt/v4" "github.com/gorilla/sessions" diff --git a/auth/ldap.go b/internal/auth/ldap.go similarity index 98% rename from auth/ldap.go rename to internal/auth/ldap.go index 4c5e0d5..8a4f40b 100644 --- a/auth/ldap.go +++ b/internal/auth/ldap.go @@ -6,8 +6,7 @@ import ( "strings" "time" - "github.com/ClusterCockpit/cc-backend/log" - + "github.com/ClusterCockpit/cc-backend/pkg/log" "github.com/go-ldap/ldap/v3" ) diff --git a/config/config.go b/internal/config/config.go similarity index 97% rename from config/config.go rename to internal/config/config.go index 76adeb0..cfb1ca8 100644 --- a/config/config.go +++ b/internal/config/config.go @@ -11,9 +11,9 @@ import ( "sync" "time" - "github.com/ClusterCockpit/cc-backend/auth" - "github.com/ClusterCockpit/cc-backend/graph/model" - "github.com/ClusterCockpit/cc-backend/schema" + "github.com/ClusterCockpit/cc-backend/internal/auth" + "github.com/ClusterCockpit/cc-backend/internal/graph/model" + "github.com/ClusterCockpit/cc-backend/pkg/schema" "github.com/iamlouk/lrucache" "github.com/jmoiron/sqlx" ) diff --git a/config/nodelist.go b/internal/config/nodelist.go similarity index 98% rename from config/nodelist.go rename to internal/config/nodelist.go index fb823df..715f55a 100644 --- a/config/nodelist.go +++ b/internal/config/nodelist.go @@ -5,7 +5,7 @@ import ( "strconv" "strings" - "github.com/ClusterCockpit/cc-backend/log" + "github.com/ClusterCockpit/cc-backend/pkg/log" ) type NodeList [][]interface { diff --git a/config/nodelist_test.go b/internal/config/nodelist_test.go similarity index 100% rename from config/nodelist_test.go rename to internal/config/nodelist_test.go diff --git a/graph/generated/generated.go b/internal/graph/generated/generated.go similarity index 99% rename from graph/generated/generated.go rename to internal/graph/generated/generated.go index e1e5db4..3c62d5d 100644 --- a/graph/generated/generated.go +++ b/internal/graph/generated/generated.go @@ -13,8 +13,8 @@ import ( "github.com/99designs/gqlgen/graphql" "github.com/99designs/gqlgen/graphql/introspection" - "github.com/ClusterCockpit/cc-backend/graph/model" - "github.com/ClusterCockpit/cc-backend/schema" + "github.com/ClusterCockpit/cc-backend/internal/graph/model" + "github.com/ClusterCockpit/cc-backend/pkg/schema" gqlparser "github.com/vektah/gqlparser/v2" "github.com/vektah/gqlparser/v2/ast" ) diff --git a/graph/model/models.go b/internal/graph/model/models.go similarity index 100% rename from graph/model/models.go rename to internal/graph/model/models.go diff --git a/graph/model/models_gen.go b/internal/graph/model/models_gen.go similarity index 99% rename from graph/model/models_gen.go rename to internal/graph/model/models_gen.go index ca8186e..91263ef 100644 --- a/graph/model/models_gen.go +++ b/internal/graph/model/models_gen.go @@ -8,7 +8,7 @@ import ( "strconv" "time" - "github.com/ClusterCockpit/cc-backend/schema" + "github.com/ClusterCockpit/cc-backend/pkg/schema" ) type Accelerator struct { diff --git a/graph/resolver.go b/internal/graph/resolver.go similarity index 81% rename from graph/resolver.go rename to internal/graph/resolver.go index ce08e33..dd7bc3b 100644 --- a/graph/resolver.go +++ b/internal/graph/resolver.go @@ -1,7 +1,7 @@ package graph import ( - "github.com/ClusterCockpit/cc-backend/repository" + "github.com/ClusterCockpit/cc-backend/internal/repository" "github.com/jmoiron/sqlx" ) diff --git a/internal/graph/schema.graphqls b/internal/graph/schema.graphqls new file mode 100644 index 0000000..8652bed --- /dev/null +++ b/internal/graph/schema.graphqls @@ -0,0 +1,275 @@ +scalar Time +scalar Any + +scalar NullableFloat +scalar MetricScope +scalar JobState + +type Job { + id: ID! + jobId: Int! + user: String! + project: String! + cluster: String! + subCluster: String! + startTime: Time! + duration: Int! + walltime: Int! + numNodes: Int! + numHWThreads: Int! + numAcc: Int! + SMT: Int! + exclusive: Int! + partition: String! + arrayJobId: Int! + monitoringStatus: Int! + state: JobState! + tags: [Tag!]! + resources: [Resource!]! + + metaData: Any + userData: User +} + +type Cluster { + name: String! + partitions: [String!]! # Slurm partitions + metricConfig: [MetricConfig!]! + filterRanges: FilterRanges! + subClusters: [SubCluster!]! # Hardware partitions/subclusters +} + +type SubCluster { + name: String! + nodes: String! + numberOfNodes: Int! + processorType: String! + socketsPerNode: Int! + coresPerSocket: Int! + threadsPerCore: Int! + flopRateScalar: Int! + flopRateSimd: Int! + memoryBandwidth: Int! + topology: Topology! +} + +type Topology { + node: [Int!] + socket: [[Int!]!] + memoryDomain: [[Int!]!] + die: [[Int!]!] + core: [[Int!]!] + accelerators: [Accelerator!] +} + +type Accelerator { + id: String! + type: String! + model: String! +} + +type SubClusterConfig { + name: String! + peak: Float! + normal: Float! + caution: Float! + alert: Float! +} + +type MetricConfig { + name: String! + unit: String! + scope: MetricScope! + aggregation: String + timestep: Int! + peak: Float + normal: Float + caution: Float + alert: Float + subClusters: [SubClusterConfig] +} + +type Tag { + id: ID! + type: String! + name: String! +} + +type Resource { + hostname: String! + hwthreads: [Int!] + accelerators: [String!] + configuration: String +} + +type JobMetricWithName { + name: String! + metric: JobMetric! +} + +type JobMetric { + unit: String! + scope: MetricScope! + timestep: Int! + series: [Series!] + statisticsSeries: StatsSeries +} + +type Series { + hostname: String! + id: Int + statistics: MetricStatistics + data: [NullableFloat!]! +} + +type MetricStatistics { + avg: Float! + min: Float! + max: Float! +} + +type StatsSeries { + mean: [NullableFloat!]! + min: [NullableFloat!]! + max: [NullableFloat!]! +} + +type MetricFootprints { + metric: String! + data: [NullableFloat!]! +} + +type Footprints { + nodehours: [NullableFloat!]! + metrics: [MetricFootprints!]! +} + +enum Aggregate { USER, PROJECT, CLUSTER } +enum Weights { NODE_COUNT, NODE_HOURS } + +type NodeMetrics { + host: String! + subCluster: String! + metrics: [JobMetricWithName!]! +} + +type Count { + name: String! + count: Int! +} + +type User { + username: String! + name: String! + email: String! +} + +type Query { + clusters: [Cluster!]! # List of all clusters + tags: [Tag!]! # List of all tags + + user(username: String!): User + allocatedNodes(cluster: String!): [Count!]! + + job(id: ID!): Job + jobMetrics(id: ID!, metrics: [String!], scopes: [MetricScope!]): [JobMetricWithName!]! + jobsFootprints(filter: [JobFilter!], metrics: [String!]!): Footprints + + jobs(filter: [JobFilter!], page: PageRequest, order: OrderByInput): JobResultList! + jobsStatistics(filter: [JobFilter!], groupBy: Aggregate): [JobsStatistics!]! + jobsCount(filter: [JobFilter]!, groupBy: Aggregate!, weight: Weights, limit: Int): [Count!]! + + rooflineHeatmap(filter: [JobFilter!]!, rows: Int!, cols: Int!, minX: Float!, minY: Float!, maxX: Float!, maxY: Float!): [[Float!]!]! + + nodeMetrics(cluster: String!, nodes: [String!], scopes: [MetricScope!], metrics: [String!], from: Time!, to: Time!): [NodeMetrics!]! +} + +type Mutation { + createTag(type: String!, name: String!): Tag! + deleteTag(id: ID!): ID! + addTagsToJob(job: ID!, tagIds: [ID!]!): [Tag!]! + removeTagsFromJob(job: ID!, tagIds: [ID!]!): [Tag!]! + + updateConfiguration(name: String!, value: String!): String +} + +type IntRangeOutput { from: Int!, to: Int! } +type TimeRangeOutput { from: Time!, to: Time! } + +type FilterRanges { + duration: IntRangeOutput! + numNodes: IntRangeOutput! + startTime: TimeRangeOutput! +} + +input JobFilter { + tags: [ID!] + jobId: StringInput + arrayJobId: Int + user: StringInput + project: StringInput + cluster: StringInput + partition: StringInput + duration: IntRange + + minRunningFor: Int + + numNodes: IntRange + numAccelerators: IntRange + numHWThreads: IntRange + + startTime: TimeRange + state: [JobState!] + flopsAnyAvg: FloatRange + memBwAvg: FloatRange + loadAvg: FloatRange + memUsedMax: FloatRange +} + +input OrderByInput { + field: String! + order: SortDirectionEnum! = ASC +} + +enum SortDirectionEnum { + DESC + ASC +} + +input StringInput { + eq: String + contains: String + startsWith: String + endsWith: String +} + +input IntRange { from: Int!, to: Int! } +input FloatRange { from: Float!, to: Float! } +input TimeRange { from: Time, to: Time } + +type JobResultList { + items: [Job!]! + offset: Int + limit: Int + count: Int +} + +type HistoPoint { + count: Int! + value: Int! +} + +type JobsStatistics { + id: ID! # If `groupBy` was used, ID of the user/project/cluster + totalJobs: Int! # Number of jobs that matched + shortJobs: Int! # Number of jobs with a duration of less than 2 minutes + totalWalltime: Int! # Sum of the duration of all matched jobs in hours + totalCoreHours: Int! # Sum of the core hours of all matched jobs + histDuration: [HistoPoint!]! # value: hour, count: number of jobs with a rounded duration of value + histNumNodes: [HistoPoint!]! # value: number of nodes, count: number of jobs with that number of nodes +} + +input PageRequest { + itemsPerPage: Int! + page: Int! +} diff --git a/graph/schema.resolvers.go b/internal/graph/schema.resolvers.go similarity index 95% rename from graph/schema.resolvers.go rename to internal/graph/schema.resolvers.go index 46b4d7f..d8e94b9 100644 --- a/graph/schema.resolvers.go +++ b/internal/graph/schema.resolvers.go @@ -10,12 +10,12 @@ import ( "strconv" "time" - "github.com/ClusterCockpit/cc-backend/auth" - "github.com/ClusterCockpit/cc-backend/config" - "github.com/ClusterCockpit/cc-backend/graph/generated" - "github.com/ClusterCockpit/cc-backend/graph/model" - "github.com/ClusterCockpit/cc-backend/metricdata" - "github.com/ClusterCockpit/cc-backend/schema" + "github.com/ClusterCockpit/cc-backend/internal/auth" + "github.com/ClusterCockpit/cc-backend/internal/config" + "github.com/ClusterCockpit/cc-backend/internal/graph/generated" + "github.com/ClusterCockpit/cc-backend/internal/graph/model" + "github.com/ClusterCockpit/cc-backend/internal/metricdata" + "github.com/ClusterCockpit/cc-backend/pkg/schema" ) func (r *clusterResolver) Partitions(ctx context.Context, obj *model.Cluster) ([]string, error) { diff --git a/graph/stats.go b/internal/graph/stats.go similarity index 96% rename from graph/stats.go rename to internal/graph/stats.go index 52c8443..c3d90c9 100644 --- a/graph/stats.go +++ b/internal/graph/stats.go @@ -9,11 +9,11 @@ import ( "time" "github.com/99designs/gqlgen/graphql" - "github.com/ClusterCockpit/cc-backend/config" - "github.com/ClusterCockpit/cc-backend/graph/model" - "github.com/ClusterCockpit/cc-backend/metricdata" - "github.com/ClusterCockpit/cc-backend/repository" - "github.com/ClusterCockpit/cc-backend/schema" + "github.com/ClusterCockpit/cc-backend/internal/config" + "github.com/ClusterCockpit/cc-backend/internal/graph/model" + "github.com/ClusterCockpit/cc-backend/internal/metricdata" + "github.com/ClusterCockpit/cc-backend/internal/repository" + "github.com/ClusterCockpit/cc-backend/pkg/schema" sq "github.com/Masterminds/squirrel" ) diff --git a/metricdata/archive.go b/internal/metricdata/archive.go similarity index 98% rename from metricdata/archive.go rename to internal/metricdata/archive.go index 80271f0..80b5298 100644 --- a/metricdata/archive.go +++ b/internal/metricdata/archive.go @@ -13,8 +13,8 @@ import ( "strconv" "time" - "github.com/ClusterCockpit/cc-backend/config" - "github.com/ClusterCockpit/cc-backend/schema" + "github.com/ClusterCockpit/cc-backend/internal/config" + "github.com/ClusterCockpit/cc-backend/pkg/schema" ) // For a given job, return the path of the `data.json`/`meta.json` file. diff --git a/metricdata/cc-metric-store.go b/internal/metricdata/cc-metric-store.go similarity index 99% rename from metricdata/cc-metric-store.go rename to internal/metricdata/cc-metric-store.go index e26b72f..8deab14 100644 --- a/metricdata/cc-metric-store.go +++ b/internal/metricdata/cc-metric-store.go @@ -11,8 +11,8 @@ import ( "strings" "time" - "github.com/ClusterCockpit/cc-backend/config" - "github.com/ClusterCockpit/cc-backend/schema" + "github.com/ClusterCockpit/cc-backend/internal/config" + "github.com/ClusterCockpit/cc-backend/pkg/schema" ) type CCMetricStoreConfig struct { diff --git a/internal/metricdata/influxdb-v2.go b/internal/metricdata/influxdb-v2.go new file mode 100644 index 0000000..6a47bbd --- /dev/null +++ b/internal/metricdata/influxdb-v2.go @@ -0,0 +1,308 @@ +package metricdata + +import ( + "context" + "crypto/tls" + "encoding/json" + "errors" + "fmt" + "log" + "strings" + "time" + + "github.com/ClusterCockpit/cc-backend/internal/config" + "github.com/ClusterCockpit/cc-backend/pkg/schema" + influxdb2 "github.com/influxdata/influxdb-client-go/v2" + influxdb2Api "github.com/influxdata/influxdb-client-go/v2/api" +) + +type InfluxDBv2DataRepositoryConfig struct { + Url string `json:"url"` + Token string `json:"token"` + Bucket string `json:"bucket"` + Org string `json:"org"` + SkipTls bool `json:"skiptls"` +} + +type InfluxDBv2DataRepository struct { + client influxdb2.Client + queryClient influxdb2Api.QueryAPI + bucket, measurement string +} + +func (idb *InfluxDBv2DataRepository) Init(rawConfig json.RawMessage) error { + var config InfluxDBv2DataRepositoryConfig + if err := json.Unmarshal(rawConfig, &config); err != nil { + return err + } + + idb.client = influxdb2.NewClientWithOptions(config.Url, config.Token, influxdb2.DefaultOptions().SetTLSConfig(&tls.Config{InsecureSkipVerify: config.SkipTls})) + idb.queryClient = idb.client.QueryAPI(config.Org) + idb.bucket = config.Bucket + + return nil +} + +func (idb *InfluxDBv2DataRepository) formatTime(t time.Time) string { + return t.Format(time.RFC3339) // Like “2006-01-02T15:04:05Z07:00” +} + +func (idb *InfluxDBv2DataRepository) epochToTime(epoch int64) time.Time { + return time.Unix(epoch, 0) +} + +func (idb *InfluxDBv2DataRepository) LoadData(job *schema.Job, metrics []string, scopes []schema.MetricScope, ctx context.Context) (schema.JobData, error) { + + measurementsConds := make([]string, 0, len(metrics)) + for _, m := range metrics { + measurementsConds = append(measurementsConds, fmt.Sprintf(`r["_measurement"] == "%s"`, m)) + } + measurementsCond := strings.Join(measurementsConds, " or ") + + hostsConds := make([]string, 0, len(job.Resources)) + for _, h := range job.Resources { + if h.HWThreads != nil || h.Accelerators != nil { + // TODO + return nil, errors.New("the InfluxDB metric data repository does not yet support HWThreads or Accelerators") + } + hostsConds = append(hostsConds, fmt.Sprintf(`r["hostname"] == "%s"`, h.Hostname)) + } + hostsCond := strings.Join(hostsConds, " or ") + + jobData := make(schema.JobData) // Empty Schema: map[FIELD]map[SCOPE]<*JobMetric>METRIC + // Requested Scopes + for _, scope := range scopes { + query := "" + switch scope { + case "node": + // Get Finest Granularity, Groupy By Measurement and Hostname (== Metric / Node), Calculate Mean for 60s windows + // log.Println("Note: Scope 'node' requested. ") + query = fmt.Sprintf(` + from(bucket: "%s") + |> range(start: %s, stop: %s) + |> filter(fn: (r) => (%s) and (%s) ) + |> drop(columns: ["_start", "_stop"]) + |> group(columns: ["hostname", "_measurement"]) + |> aggregateWindow(every: 60s, fn: mean) + |> drop(columns: ["_time"])`, + idb.bucket, + idb.formatTime(job.StartTime), idb.formatTime(idb.epochToTime(job.StartTimeUnix+int64(job.Duration)+int64(1))), + measurementsCond, hostsCond) + case "socket": + log.Println("Note: Scope 'socket' requested, but not yet supported: Will return 'node' scope only. ") + continue + case "core": + log.Println("Note: Scope 'core' requested, but not yet supported: Will return 'node' scope only. ") + continue + // Get Finest Granularity only, Set NULL to 0.0 + // query = fmt.Sprintf(` + // from(bucket: "%s") + // |> range(start: %s, stop: %s) + // |> filter(fn: (r) => %s ) + // |> filter(fn: (r) => %s ) + // |> drop(columns: ["_start", "_stop", "cluster"]) + // |> map(fn: (r) => (if exists r._value then {r with _value: r._value} else {r with _value: 0.0}))`, + // idb.bucket, + // idb.formatTime(job.StartTime), idb.formatTime(idb.epochToTime(job.StartTimeUnix + int64(job.Duration) + int64(1) )), + // measurementsCond, hostsCond) + default: + log.Println("Note: Unknown Scope requested: Will return 'node' scope. ") + continue + // return nil, errors.New("the InfluxDB metric data repository does not yet support other scopes than 'node'") + } + + rows, err := idb.queryClient.Query(ctx, query) + if err != nil { + return nil, err + } + + // Init Metrics: Only Node level now -> TODO: Matching /check on scope level ... + for _, metric := range metrics { + jobMetric, ok := jobData[metric] + if !ok { + mc := config.GetMetricConfig(job.Cluster, metric) + jobMetric = map[schema.MetricScope]*schema.JobMetric{ + scope: { // uses scope var from above! + Unit: mc.Unit, + Scope: scope, + Timestep: mc.Timestep, + Series: make([]schema.Series, 0, len(job.Resources)), + StatisticsSeries: nil, // Should be: &schema.StatsSeries{}, + }, + } + } + jobData[metric] = jobMetric + } + + // Process Result: Time-Data + field, host, hostSeries := "", "", schema.Series{} + // typeId := 0 + switch scope { + case "node": + for rows.Next() { + row := rows.Record() + if host == "" || host != row.ValueByKey("hostname").(string) || rows.TableChanged() { + if host != "" { + // Append Series before reset + jobData[field][scope].Series = append(jobData[field][scope].Series, hostSeries) + } + field, host = row.Measurement(), row.ValueByKey("hostname").(string) + hostSeries = schema.Series{ + Hostname: host, + Statistics: nil, + Data: make([]schema.Float, 0), + } + } + val, ok := row.Value().(float64) + if ok { + hostSeries.Data = append(hostSeries.Data, schema.Float(val)) + } else { + hostSeries.Data = append(hostSeries.Data, schema.Float(0)) + } + } + case "socket": + continue + case "core": + continue + // Include Series.Id in hostSeries + // for rows.Next() { + // row := rows.Record() + // if ( host == "" || host != row.ValueByKey("hostname").(string) || typeId != row.ValueByKey("type-id").(int) || rows.TableChanged() ) { + // if ( host != "" ) { + // // Append Series before reset + // jobData[field][scope].Series = append(jobData[field][scope].Series, hostSeries) + // } + // field, host, typeId = row.Measurement(), row.ValueByKey("hostname").(string), row.ValueByKey("type-id").(int) + // hostSeries = schema.Series{ + // Hostname: host, + // Id: &typeId, + // Statistics: nil, + // Data: make([]schema.Float, 0), + // } + // } + // val := row.Value().(float64) + // hostSeries.Data = append(hostSeries.Data, schema.Float(val)) + // } + default: + continue + // return nil, errors.New("the InfluxDB metric data repository does not yet support other scopes than 'node, core'") + } + // Append last Series + jobData[field][scope].Series = append(jobData[field][scope].Series, hostSeries) + } + + // Get Stats + stats, err := idb.LoadStats(job, metrics, ctx) + if err != nil { + return nil, err + } + + for _, scope := range scopes { + if scope == "node" { // No 'socket/core' support yet + for metric, nodes := range stats { + // log.Println(fmt.Sprintf("<< Add Stats for : Field %s >>", metric)) + for node, stats := range nodes { + // log.Println(fmt.Sprintf("<< Add Stats for : Host %s : Min %.2f, Max %.2f, Avg %.2f >>", node, stats.Min, stats.Max, stats.Avg )) + for index, _ := range jobData[metric][scope].Series { + // log.Println(fmt.Sprintf("<< Try to add Stats to Series in Position %d >>", index)) + if jobData[metric][scope].Series[index].Hostname == node { + // log.Println(fmt.Sprintf("<< Match for Series in Position %d : Host %s >>", index, jobData[metric][scope].Series[index].Hostname)) + jobData[metric][scope].Series[index].Statistics = &schema.MetricStatistics{Avg: stats.Avg, Min: stats.Min, Max: stats.Max} + // log.Println(fmt.Sprintf("<< Result Inner: Min %.2f, Max %.2f, Avg %.2f >>", jobData[metric][scope].Series[index].Statistics.Min, jobData[metric][scope].Series[index].Statistics.Max, jobData[metric][scope].Series[index].Statistics.Avg)) + } + } + } + } + } + } + + // DEBUG: + // for _, scope := range scopes { + // for _, met := range metrics { + // for _, series := range jobData[met][scope].Series { + // log.Println(fmt.Sprintf("<< Result: %d data points for metric %s on %s with scope %s, Stats: Min %.2f, Max %.2f, Avg %.2f >>", + // len(series.Data), met, series.Hostname, scope, + // series.Statistics.Min, series.Statistics.Max, series.Statistics.Avg)) + // } + // } + // } + + return jobData, nil +} + +func (idb *InfluxDBv2DataRepository) LoadStats(job *schema.Job, metrics []string, ctx context.Context) (map[string]map[string]schema.MetricStatistics, error) { + + stats := map[string]map[string]schema.MetricStatistics{} + + hostsConds := make([]string, 0, len(job.Resources)) + for _, h := range job.Resources { + if h.HWThreads != nil || h.Accelerators != nil { + // TODO + return nil, errors.New("the InfluxDB metric data repository does not yet support HWThreads or Accelerators") + } + hostsConds = append(hostsConds, fmt.Sprintf(`r["hostname"] == "%s"`, h.Hostname)) + } + hostsCond := strings.Join(hostsConds, " or ") + + // lenMet := len(metrics) + + for _, metric := range metrics { + // log.Println(fmt.Sprintf("<< You are here: %s (Index %d of %d metrics)", metric, index, lenMet)) + + query := fmt.Sprintf(` + data = from(bucket: "%s") + |> range(start: %s, stop: %s) + |> filter(fn: (r) => r._measurement == "%s" and r._field == "value" and (%s)) + union(tables: [data |> mean(column: "_value") |> set(key: "_field", value: "avg"), + data |> min(column: "_value") |> set(key: "_field", value: "min"), + data |> max(column: "_value") |> set(key: "_field", value: "max")]) + |> pivot(rowKey: ["hostname"], columnKey: ["_field"], valueColumn: "_value") + |> group()`, + idb.bucket, + idb.formatTime(job.StartTime), idb.formatTime(idb.epochToTime(job.StartTimeUnix+int64(job.Duration)+int64(1))), + metric, hostsCond) + + rows, err := idb.queryClient.Query(ctx, query) + if err != nil { + return nil, err + } + + nodes := map[string]schema.MetricStatistics{} + for rows.Next() { + row := rows.Record() + host := row.ValueByKey("hostname").(string) + + avg, avgok := row.ValueByKey("avg").(float64) + if !avgok { + // log.Println(fmt.Sprintf(">> Assertion error for metric %s, statistic AVG. Expected 'float64', got %v", metric, avg)) + avg = 0.0 + } + min, minok := row.ValueByKey("min").(float64) + if !minok { + // log.Println(fmt.Sprintf(">> Assertion error for metric %s, statistic MIN. Expected 'float64', got %v", metric, min)) + min = 0.0 + } + max, maxok := row.ValueByKey("max").(float64) + if !maxok { + // log.Println(fmt.Sprintf(">> Assertion error for metric %s, statistic MAX. Expected 'float64', got %v", metric, max)) + max = 0.0 + } + + nodes[host] = schema.MetricStatistics{ + Avg: avg, + Min: min, + Max: max, + } + } + stats[metric] = nodes + } + + return stats, nil +} + +func (idb *InfluxDBv2DataRepository) LoadNodeData(cluster string, metrics, nodes []string, scopes []schema.MetricScope, from, to time.Time, ctx context.Context) (map[string]map[string][]*schema.JobMetric, error) { + // TODO : Implement to be used in Analysis- und System/Node-View + log.Println(fmt.Sprintf("LoadNodeData unimplemented for InfluxDBv2DataRepository, Args: cluster %s, metrics %v, nodes %v, scopes %v", cluster, metrics, nodes, scopes)) + + return nil, errors.New("unimplemented for InfluxDBv2DataRepository") +} diff --git a/metricdata/metricdata.go b/internal/metricdata/metricdata.go similarity index 97% rename from metricdata/metricdata.go rename to internal/metricdata/metricdata.go index 24b44bc..019ddf5 100644 --- a/metricdata/metricdata.go +++ b/internal/metricdata/metricdata.go @@ -6,9 +6,9 @@ import ( "fmt" "time" - "github.com/ClusterCockpit/cc-backend/config" - "github.com/ClusterCockpit/cc-backend/log" - "github.com/ClusterCockpit/cc-backend/schema" + "github.com/ClusterCockpit/cc-backend/internal/config" + "github.com/ClusterCockpit/cc-backend/pkg/log" + "github.com/ClusterCockpit/cc-backend/pkg/schema" "github.com/iamlouk/lrucache" ) diff --git a/metricdata/utils.go b/internal/metricdata/utils.go similarity index 95% rename from metricdata/utils.go rename to internal/metricdata/utils.go index 7a92c4d..a6c550b 100644 --- a/metricdata/utils.go +++ b/internal/metricdata/utils.go @@ -5,7 +5,7 @@ import ( "encoding/json" "time" - "github.com/ClusterCockpit/cc-backend/schema" + "github.com/ClusterCockpit/cc-backend/pkg/schema" ) var TestLoadDataCallback func(job *schema.Job, metrics []string, scopes []schema.MetricScope, ctx context.Context) (schema.JobData, error) = func(job *schema.Job, metrics []string, scopes []schema.MetricScope, ctx context.Context) (schema.JobData, error) { diff --git a/internal/repository/dbConnection.go b/internal/repository/dbConnection.go new file mode 100644 index 0000000..92ed703 --- /dev/null +++ b/internal/repository/dbConnection.go @@ -0,0 +1,58 @@ +package repository + +import ( + "fmt" + "log" + "sync" + "time" + + "github.com/jmoiron/sqlx" +) + +var ( + dbConnOnce sync.Once + dbConnInstance *DBConnection +) + +type DBConnection struct { + DB *sqlx.DB +} + +func Connect(driver string, db string) { + var err error + var dbHandle *sqlx.DB + + dbConnOnce.Do(func() { + if driver == "sqlite3" { + dbHandle, err = sqlx.Open("sqlite3", fmt.Sprintf("%s?_foreign_keys=on", db)) + if err != nil { + log.Fatal(err) + } + + // sqlite does not multithread. Having more than one connection open would just mean + // waiting for locks. + dbHandle.SetMaxOpenConns(1) + } else if driver == "mysql" { + dbHandle, err = sqlx.Open("mysql", fmt.Sprintf("%s?multiStatements=true", db)) + if err != nil { + log.Fatal(err) + } + + dbHandle.SetConnMaxLifetime(time.Minute * 3) + dbHandle.SetMaxOpenConns(10) + dbHandle.SetMaxIdleConns(10) + } else { + log.Fatalf("unsupported database driver: %s", driver) + } + + dbConnInstance = &DBConnection{DB: dbHandle} + }) +} + +func GetConnection() *DBConnection { + if dbConnInstance == nil { + log.Fatalf("Database connection not initialized!") + } + + return dbConnInstance +} diff --git a/repository/import.go b/internal/repository/import.go similarity index 95% rename from repository/import.go rename to internal/repository/import.go index a18c189..69a5c4f 100644 --- a/repository/import.go +++ b/internal/repository/import.go @@ -9,10 +9,10 @@ import ( "strings" "time" - "github.com/ClusterCockpit/cc-backend/config" - "github.com/ClusterCockpit/cc-backend/log" - "github.com/ClusterCockpit/cc-backend/metricdata" - "github.com/ClusterCockpit/cc-backend/schema" + "github.com/ClusterCockpit/cc-backend/internal/config" + "github.com/ClusterCockpit/cc-backend/internal/metricdata" + "github.com/ClusterCockpit/cc-backend/pkg/log" + "github.com/ClusterCockpit/cc-backend/pkg/schema" ) const NamedJobInsert string = `INSERT INTO job ( diff --git a/repository/init.go b/internal/repository/init.go similarity index 98% rename from repository/init.go rename to internal/repository/init.go index 44b8bd6..a6b84a4 100644 --- a/repository/init.go +++ b/internal/repository/init.go @@ -8,8 +8,8 @@ import ( "path/filepath" "time" - "github.com/ClusterCockpit/cc-backend/log" - "github.com/ClusterCockpit/cc-backend/schema" + "github.com/ClusterCockpit/cc-backend/pkg/log" + "github.com/ClusterCockpit/cc-backend/pkg/schema" "github.com/jmoiron/sqlx" ) diff --git a/repository/job.go b/internal/repository/job.go similarity index 95% rename from repository/job.go rename to internal/repository/job.go index c7d65cf..dd34f51 100644 --- a/repository/job.go +++ b/internal/repository/job.go @@ -7,17 +7,23 @@ import ( "errors" "fmt" "strconv" + "sync" "time" - "github.com/ClusterCockpit/cc-backend/auth" - "github.com/ClusterCockpit/cc-backend/graph/model" - "github.com/ClusterCockpit/cc-backend/log" - "github.com/ClusterCockpit/cc-backend/schema" + "github.com/ClusterCockpit/cc-backend/internal/auth" + "github.com/ClusterCockpit/cc-backend/internal/graph/model" + "github.com/ClusterCockpit/cc-backend/pkg/log" + "github.com/ClusterCockpit/cc-backend/pkg/schema" sq "github.com/Masterminds/squirrel" "github.com/iamlouk/lrucache" "github.com/jmoiron/sqlx" ) +var ( + jobRepoOnce sync.Once + jobRepoInstance *JobRepository +) + type JobRepository struct { DB *sqlx.DB @@ -25,10 +31,18 @@ type JobRepository struct { cache *lrucache.Cache } -func (r *JobRepository) Init() error { - r.stmtCache = sq.NewStmtCache(r.DB) - r.cache = lrucache.New(1024 * 1024) - return nil +func GetRepository() *JobRepository { + jobRepoOnce.Do(func() { + db := GetConnection() + + jobRepoInstance = &JobRepository{ + DB: db.DB, + stmtCache: sq.NewStmtCache(db.DB), + cache: lrucache.New(1024 * 1024), + } + }) + + return jobRepoInstance } var jobColumns []string = []string{ diff --git a/repository/job_test.go b/internal/repository/job_test.go similarity index 84% rename from repository/job_test.go rename to internal/repository/job_test.go index 5cf54bb..3f82d6b 100644 --- a/repository/job_test.go +++ b/internal/repository/job_test.go @@ -11,22 +11,11 @@ import ( var db *sqlx.DB func init() { - var err error - db, err = sqlx.Open("sqlite3", "../test/test.db") - if err != nil { - fmt.Println(err) - } + Connect("sqlite3", "../../test/test.db") } func setup(t *testing.T) *JobRepository { - r := &JobRepository{ - DB: db, - } - if err := r.Init(); err != nil { - t.Fatal(err) - } - - return r + return GetRepository() } func TestFind(t *testing.T) { diff --git a/repository/query.go b/internal/repository/query.go similarity index 96% rename from repository/query.go rename to internal/repository/query.go index 63c98aa..ae5b60b 100644 --- a/repository/query.go +++ b/internal/repository/query.go @@ -8,10 +8,10 @@ import ( "strings" "time" - "github.com/ClusterCockpit/cc-backend/auth" - "github.com/ClusterCockpit/cc-backend/graph/model" - "github.com/ClusterCockpit/cc-backend/log" - "github.com/ClusterCockpit/cc-backend/schema" + "github.com/ClusterCockpit/cc-backend/internal/auth" + "github.com/ClusterCockpit/cc-backend/internal/graph/model" + "github.com/ClusterCockpit/cc-backend/pkg/log" + "github.com/ClusterCockpit/cc-backend/pkg/schema" sq "github.com/Masterminds/squirrel" ) diff --git a/repository/tags.go b/internal/repository/tags.go similarity index 97% rename from repository/tags.go rename to internal/repository/tags.go index 8e83bf1..411a5fc 100644 --- a/repository/tags.go +++ b/internal/repository/tags.go @@ -1,8 +1,8 @@ package repository import ( - "github.com/ClusterCockpit/cc-backend/metricdata" - "github.com/ClusterCockpit/cc-backend/schema" + "github.com/ClusterCockpit/cc-backend/internal/metricdata" + "github.com/ClusterCockpit/cc-backend/pkg/schema" sq "github.com/Masterminds/squirrel" ) diff --git a/routes.go b/internal/routerConfig/routes.go similarity index 92% rename from routes.go rename to internal/routerConfig/routes.go index 9885b94..cb888eb 100644 --- a/routes.go +++ b/internal/routerConfig/routes.go @@ -1,4 +1,4 @@ -package main +package routerConfig import ( "fmt" @@ -8,13 +8,14 @@ import ( "strings" "time" - "github.com/ClusterCockpit/cc-backend/auth" - "github.com/ClusterCockpit/cc-backend/config" - "github.com/ClusterCockpit/cc-backend/graph" - "github.com/ClusterCockpit/cc-backend/graph/model" - "github.com/ClusterCockpit/cc-backend/log" - "github.com/ClusterCockpit/cc-backend/schema" - "github.com/ClusterCockpit/cc-backend/templates" + "github.com/ClusterCockpit/cc-backend/internal/auth" + "github.com/ClusterCockpit/cc-backend/internal/config" + "github.com/ClusterCockpit/cc-backend/internal/graph" + "github.com/ClusterCockpit/cc-backend/internal/graph/model" + "github.com/ClusterCockpit/cc-backend/internal/repository" + "github.com/ClusterCockpit/cc-backend/internal/templates" + "github.com/ClusterCockpit/cc-backend/pkg/log" + "github.com/ClusterCockpit/cc-backend/pkg/schema" "github.com/gorilla/mux" ) @@ -50,6 +51,7 @@ func setupHomeRoute(i InfoType, r *http.Request) InfoType { TotalJobs int RecentShortJobs int } + jobRepo := repository.GetRepository() runningJobs, err := jobRepo.CountGroupedJobs(r.Context(), model.AggregateCluster, []*model.JobFilter{{ State: []schema.JobState{schema.JobStateRunning}, @@ -93,6 +95,7 @@ func setupJobRoute(i InfoType, r *http.Request) InfoType { } func setupUserRoute(i InfoType, r *http.Request) InfoType { + jobRepo := repository.GetRepository() username := mux.Vars(r)["id"] i["id"] = username i["username"] = username @@ -135,6 +138,7 @@ func setupAnalysisRoute(i InfoType, r *http.Request) InfoType { func setupTaglistRoute(i InfoType, r *http.Request) InfoType { var username *string = nil + jobRepo := repository.GetRepository() if user := auth.GetUser(r.Context()); user != nil && !user.HasRole(auth.RoleAdmin) { username = &user.Username } @@ -245,7 +249,7 @@ func buildFilterPresets(query url.Values) map[string]interface{} { return filterPresets } -func setupRoutes(router *mux.Router, routes []Route) { +func SetupRoutes(router *mux.Router) { for _, route := range routes { route := route router.HandleFunc(route.Route, func(rw http.ResponseWriter, r *http.Request) { diff --git a/runtimeSetup.go b/internal/runtimeEnv/setup.go similarity index 89% rename from runtimeSetup.go rename to internal/runtimeEnv/setup.go index f43e569..aa6aef3 100644 --- a/runtimeSetup.go +++ b/internal/runtimeEnv/setup.go @@ -1,4 +1,4 @@ -package main +package runtimeEnv import ( "bufio" @@ -15,7 +15,7 @@ import ( // Very simple and limited .env file reader. // All variable definitions found are directly // added to the processes environment. -func loadEnv(file string) error { +func LoadEnv(file string) error { f, err := os.Open(file) if err != nil { return err @@ -81,9 +81,9 @@ func loadEnv(file string) error { // specified in the config.json. The go runtime // takes care of all threads (and not only the calling one) // executing the underlying systemcall. -func dropPrivileges() error { - if programConfig.Group != "" { - g, err := user.LookupGroup(programConfig.Group) +func DropPrivileges(username string, group string) error { + if group != "" { + g, err := user.LookupGroup(group) if err != nil { return err } @@ -94,8 +94,8 @@ func dropPrivileges() error { } } - if programConfig.User != "" { - u, err := user.Lookup(programConfig.User) + if username != "" { + u, err := user.Lookup(username) if err != nil { return err } @@ -111,7 +111,7 @@ func dropPrivileges() error { // If started via systemd, inform systemd that we are running: // https://www.freedesktop.org/software/systemd/man/sd_notify.html -func systemdNotifiy(ready bool, status string) { +func SystemdNotifiy(ready bool, status string) { if os.Getenv("NOTIFY_SOCKET") == "" { // Not started using systemd return diff --git a/templates/templates.go b/internal/templates/templates.go similarity index 94% rename from templates/templates.go rename to internal/templates/templates.go index 0d0b956..31653b0 100644 --- a/templates/templates.go +++ b/internal/templates/templates.go @@ -5,8 +5,8 @@ import ( "net/http" "os" - "github.com/ClusterCockpit/cc-backend/config" - "github.com/ClusterCockpit/cc-backend/log" + "github.com/ClusterCockpit/cc-backend/internal/config" + "github.com/ClusterCockpit/cc-backend/pkg/log" ) var templatesDir string @@ -36,7 +36,7 @@ func init() { if ebp != "" { bp = ebp } - templatesDir = bp + "templates/" + templatesDir = bp + "web/templates/" base := template.Must(template.ParseFiles(templatesDir + "base.tmpl")) files := []string{ "home.tmpl", "404.tmpl", "login.tmpl", diff --git a/metricdata/influxdb-v2.go b/metricdata/influxdb-v2.go deleted file mode 100644 index 11a8235..0000000 --- a/metricdata/influxdb-v2.go +++ /dev/null @@ -1,308 +0,0 @@ -package metricdata - -import ( - "context" - "errors" - "fmt" - "log" - "strings" - "time" - "crypto/tls" - "encoding/json" - - "github.com/ClusterCockpit/cc-backend/config" - "github.com/ClusterCockpit/cc-backend/schema" - influxdb2 "github.com/influxdata/influxdb-client-go/v2" - influxdb2Api "github.com/influxdata/influxdb-client-go/v2/api" -) - -type InfluxDBv2DataRepositoryConfig struct { - Url string `json:"url"` - Token string `json:"token"` - Bucket string `json:"bucket"` - Org string `json:"org"` - SkipTls bool `json:"skiptls"` -} - -type InfluxDBv2DataRepository struct { - client influxdb2.Client - queryClient influxdb2Api.QueryAPI - bucket, measurement string -} - -func (idb *InfluxDBv2DataRepository) Init(rawConfig json.RawMessage) error { - var config InfluxDBv2DataRepositoryConfig - if err := json.Unmarshal(rawConfig, &config); err != nil { - return err - } - - idb.client = influxdb2.NewClientWithOptions(config.Url, config.Token, influxdb2.DefaultOptions().SetTLSConfig(&tls.Config {InsecureSkipVerify: config.SkipTls,} )) - idb.queryClient = idb.client.QueryAPI(config.Org) - idb.bucket = config.Bucket - - return nil -} - -func (idb *InfluxDBv2DataRepository) formatTime(t time.Time) string { - return t.Format(time.RFC3339) // Like “2006-01-02T15:04:05Z07:00” -} - -func (idb *InfluxDBv2DataRepository) epochToTime(epoch int64) time.Time { - return time.Unix(epoch, 0) -} - -func (idb *InfluxDBv2DataRepository) LoadData(job *schema.Job, metrics []string, scopes []schema.MetricScope, ctx context.Context) (schema.JobData, error) { - - measurementsConds := make([]string, 0, len(metrics)) - for _, m := range metrics { - measurementsConds = append(measurementsConds, fmt.Sprintf(`r["_measurement"] == "%s"`, m)) - } - measurementsCond := strings.Join(measurementsConds, " or ") - - hostsConds := make([]string, 0, len(job.Resources)) - for _, h := range job.Resources { - if h.HWThreads != nil || h.Accelerators != nil { - // TODO - return nil, errors.New("the InfluxDB metric data repository does not yet support HWThreads or Accelerators") - } - hostsConds = append(hostsConds, fmt.Sprintf(`r["hostname"] == "%s"`, h.Hostname)) - } - hostsCond := strings.Join(hostsConds, " or ") - - jobData := make(schema.JobData) // Empty Schema: map[FIELD]map[SCOPE]<*JobMetric>METRIC - // Requested Scopes - for _, scope := range scopes { - query := "" - switch scope { - case "node": - // Get Finest Granularity, Groupy By Measurement and Hostname (== Metric / Node), Calculate Mean for 60s windows - // log.Println("Note: Scope 'node' requested. ") - query = fmt.Sprintf(` - from(bucket: "%s") - |> range(start: %s, stop: %s) - |> filter(fn: (r) => (%s) and (%s) ) - |> drop(columns: ["_start", "_stop"]) - |> group(columns: ["hostname", "_measurement"]) - |> aggregateWindow(every: 60s, fn: mean) - |> drop(columns: ["_time"])`, - idb.bucket, - idb.formatTime(job.StartTime), idb.formatTime(idb.epochToTime(job.StartTimeUnix + int64(job.Duration) + int64(1) )), - measurementsCond, hostsCond) - case "socket": - log.Println("Note: Scope 'socket' requested, but not yet supported: Will return 'node' scope only. ") - continue - case "core": - log.Println("Note: Scope 'core' requested, but not yet supported: Will return 'node' scope only. ") - continue - // Get Finest Granularity only, Set NULL to 0.0 - // query = fmt.Sprintf(` - // from(bucket: "%s") - // |> range(start: %s, stop: %s) - // |> filter(fn: (r) => %s ) - // |> filter(fn: (r) => %s ) - // |> drop(columns: ["_start", "_stop", "cluster"]) - // |> map(fn: (r) => (if exists r._value then {r with _value: r._value} else {r with _value: 0.0}))`, - // idb.bucket, - // idb.formatTime(job.StartTime), idb.formatTime(idb.epochToTime(job.StartTimeUnix + int64(job.Duration) + int64(1) )), - // measurementsCond, hostsCond) - default: - log.Println("Note: Unknown Scope requested: Will return 'node' scope. ") - continue - // return nil, errors.New("the InfluxDB metric data repository does not yet support other scopes than 'node'") - } - - rows, err := idb.queryClient.Query(ctx, query) - if err != nil { - return nil, err - } - - // Init Metrics: Only Node level now -> TODO: Matching /check on scope level ... - for _, metric := range metrics { - jobMetric, ok := jobData[metric] - if !ok { - mc := config.GetMetricConfig(job.Cluster, metric) - jobMetric = map[schema.MetricScope]*schema.JobMetric{ - scope: { // uses scope var from above! - Unit: mc.Unit, - Scope: scope, - Timestep: mc.Timestep, - Series: make([]schema.Series, 0, len(job.Resources)), - StatisticsSeries: nil, // Should be: &schema.StatsSeries{}, - }, - } - } - jobData[metric] = jobMetric - } - - // Process Result: Time-Data - field, host, hostSeries := "", "", schema.Series{} - // typeId := 0 - switch scope { - case "node": - for rows.Next() { - row := rows.Record() - if ( host == "" || host != row.ValueByKey("hostname").(string) || rows.TableChanged() ) { - if ( host != "" ) { - // Append Series before reset - jobData[field][scope].Series = append(jobData[field][scope].Series, hostSeries) - } - field, host = row.Measurement(), row.ValueByKey("hostname").(string) - hostSeries = schema.Series{ - Hostname: host, - Statistics: nil, - Data: make([]schema.Float, 0), - } - } - val, ok := row.Value().(float64) - if ok { - hostSeries.Data = append(hostSeries.Data, schema.Float(val)) - } else { - hostSeries.Data = append(hostSeries.Data, schema.Float(0)) - } - } - case "socket": - continue - case "core": - continue - // Include Series.Id in hostSeries - // for rows.Next() { - // row := rows.Record() - // if ( host == "" || host != row.ValueByKey("hostname").(string) || typeId != row.ValueByKey("type-id").(int) || rows.TableChanged() ) { - // if ( host != "" ) { - // // Append Series before reset - // jobData[field][scope].Series = append(jobData[field][scope].Series, hostSeries) - // } - // field, host, typeId = row.Measurement(), row.ValueByKey("hostname").(string), row.ValueByKey("type-id").(int) - // hostSeries = schema.Series{ - // Hostname: host, - // Id: &typeId, - // Statistics: nil, - // Data: make([]schema.Float, 0), - // } - // } - // val := row.Value().(float64) - // hostSeries.Data = append(hostSeries.Data, schema.Float(val)) - // } - default: - continue - // return nil, errors.New("the InfluxDB metric data repository does not yet support other scopes than 'node, core'") - } - // Append last Series - jobData[field][scope].Series = append(jobData[field][scope].Series, hostSeries) - } - - // Get Stats - stats, err := idb.LoadStats(job, metrics, ctx) - if err != nil { - return nil, err - } - - for _, scope := range scopes { - if scope == "node" { // No 'socket/core' support yet - for metric, nodes := range stats { - // log.Println(fmt.Sprintf("<< Add Stats for : Field %s >>", metric)) - for node, stats := range nodes { - // log.Println(fmt.Sprintf("<< Add Stats for : Host %s : Min %.2f, Max %.2f, Avg %.2f >>", node, stats.Min, stats.Max, stats.Avg )) - for index, _ := range jobData[metric][scope].Series { - // log.Println(fmt.Sprintf("<< Try to add Stats to Series in Position %d >>", index)) - if jobData[metric][scope].Series[index].Hostname == node { - // log.Println(fmt.Sprintf("<< Match for Series in Position %d : Host %s >>", index, jobData[metric][scope].Series[index].Hostname)) - jobData[metric][scope].Series[index].Statistics = &schema.MetricStatistics{Avg: stats.Avg, Min: stats.Min, Max: stats.Max} - // log.Println(fmt.Sprintf("<< Result Inner: Min %.2f, Max %.2f, Avg %.2f >>", jobData[metric][scope].Series[index].Statistics.Min, jobData[metric][scope].Series[index].Statistics.Max, jobData[metric][scope].Series[index].Statistics.Avg)) - } - } - } - } - } - } - - // DEBUG: - // for _, scope := range scopes { - // for _, met := range metrics { - // for _, series := range jobData[met][scope].Series { - // log.Println(fmt.Sprintf("<< Result: %d data points for metric %s on %s with scope %s, Stats: Min %.2f, Max %.2f, Avg %.2f >>", - // len(series.Data), met, series.Hostname, scope, - // series.Statistics.Min, series.Statistics.Max, series.Statistics.Avg)) - // } - // } - // } - - return jobData, nil -} - -func (idb *InfluxDBv2DataRepository) LoadStats(job *schema.Job, metrics []string, ctx context.Context) (map[string]map[string]schema.MetricStatistics, error) { - - stats := map[string]map[string]schema.MetricStatistics{} - - hostsConds := make([]string, 0, len(job.Resources)) - for _, h := range job.Resources { - if h.HWThreads != nil || h.Accelerators != nil { - // TODO - return nil, errors.New("the InfluxDB metric data repository does not yet support HWThreads or Accelerators") - } - hostsConds = append(hostsConds, fmt.Sprintf(`r["hostname"] == "%s"`, h.Hostname)) - } - hostsCond := strings.Join(hostsConds, " or ") - - // lenMet := len(metrics) - - for _, metric := range metrics { - // log.Println(fmt.Sprintf("<< You are here: %s (Index %d of %d metrics)", metric, index, lenMet)) - - query := fmt.Sprintf(` - data = from(bucket: "%s") - |> range(start: %s, stop: %s) - |> filter(fn: (r) => r._measurement == "%s" and r._field == "value" and (%s)) - union(tables: [data |> mean(column: "_value") |> set(key: "_field", value: "avg"), - data |> min(column: "_value") |> set(key: "_field", value: "min"), - data |> max(column: "_value") |> set(key: "_field", value: "max")]) - |> pivot(rowKey: ["hostname"], columnKey: ["_field"], valueColumn: "_value") - |> group()`, - idb.bucket, - idb.formatTime(job.StartTime), idb.formatTime(idb.epochToTime(job.StartTimeUnix + int64(job.Duration) + int64(1) )), - metric, hostsCond) - - rows, err := idb.queryClient.Query(ctx, query) - if err != nil { - return nil, err - } - - nodes := map[string]schema.MetricStatistics{} - for rows.Next() { - row := rows.Record() - host := row.ValueByKey("hostname").(string) - - avg, avgok := row.ValueByKey("avg").(float64) - if !avgok { - // log.Println(fmt.Sprintf(">> Assertion error for metric %s, statistic AVG. Expected 'float64', got %v", metric, avg)) - avg = 0.0 - } - min, minok := row.ValueByKey("min").(float64) - if !minok { - // log.Println(fmt.Sprintf(">> Assertion error for metric %s, statistic MIN. Expected 'float64', got %v", metric, min)) - min = 0.0 - } - max, maxok := row.ValueByKey("max").(float64) - if !maxok { - // log.Println(fmt.Sprintf(">> Assertion error for metric %s, statistic MAX. Expected 'float64', got %v", metric, max)) - max = 0.0 - } - - nodes[host] = schema.MetricStatistics{ - Avg: avg, - Min: min, - Max: max, - } - } - stats[metric] = nodes - } - - return stats, nil -} - -func (idb *InfluxDBv2DataRepository) LoadNodeData(cluster string, metrics, nodes []string, scopes []schema.MetricScope, from, to time.Time, ctx context.Context) (map[string]map[string][]*schema.JobMetric, error) { - // TODO : Implement to be used in Analysis- und System/Node-View - log.Println(fmt.Sprintf("LoadNodeData unimplemented for InfluxDBv2DataRepository, Args: cluster %s, metrics %v, nodes %v, scopes %v", cluster, metrics, nodes, scopes)) - - return nil, errors.New("unimplemented for InfluxDBv2DataRepository") -} diff --git a/log/log.go b/pkg/log/log.go similarity index 100% rename from log/log.go rename to pkg/log/log.go diff --git a/schema/float.go b/pkg/schema/float.go similarity index 100% rename from schema/float.go rename to pkg/schema/float.go diff --git a/schema/job.go b/pkg/schema/job.go similarity index 100% rename from schema/job.go rename to pkg/schema/job.go diff --git a/schema/metrics.go b/pkg/schema/metrics.go similarity index 100% rename from schema/metrics.go rename to pkg/schema/metrics.go diff --git a/templates/404.tmpl b/web/templates/404.tmpl similarity index 100% rename from templates/404.tmpl rename to web/templates/404.tmpl diff --git a/templates/base.tmpl b/web/templates/base.tmpl similarity index 100% rename from templates/base.tmpl rename to web/templates/base.tmpl diff --git a/templates/config.tmpl b/web/templates/config.tmpl similarity index 100% rename from templates/config.tmpl rename to web/templates/config.tmpl diff --git a/templates/home.tmpl b/web/templates/home.tmpl similarity index 100% rename from templates/home.tmpl rename to web/templates/home.tmpl diff --git a/templates/imprint.tmpl b/web/templates/imprint.tmpl similarity index 100% rename from templates/imprint.tmpl rename to web/templates/imprint.tmpl diff --git a/templates/login.tmpl b/web/templates/login.tmpl similarity index 100% rename from templates/login.tmpl rename to web/templates/login.tmpl diff --git a/templates/monitoring/analysis.tmpl b/web/templates/monitoring/analysis.tmpl similarity index 100% rename from templates/monitoring/analysis.tmpl rename to web/templates/monitoring/analysis.tmpl diff --git a/templates/monitoring/job.tmpl b/web/templates/monitoring/job.tmpl similarity index 100% rename from templates/monitoring/job.tmpl rename to web/templates/monitoring/job.tmpl diff --git a/templates/monitoring/jobs.tmpl b/web/templates/monitoring/jobs.tmpl similarity index 100% rename from templates/monitoring/jobs.tmpl rename to web/templates/monitoring/jobs.tmpl diff --git a/templates/monitoring/list.tmpl b/web/templates/monitoring/list.tmpl similarity index 100% rename from templates/monitoring/list.tmpl rename to web/templates/monitoring/list.tmpl diff --git a/templates/monitoring/node.tmpl b/web/templates/monitoring/node.tmpl similarity index 100% rename from templates/monitoring/node.tmpl rename to web/templates/monitoring/node.tmpl diff --git a/templates/monitoring/status.tmpl b/web/templates/monitoring/status.tmpl similarity index 100% rename from templates/monitoring/status.tmpl rename to web/templates/monitoring/status.tmpl diff --git a/templates/monitoring/systems.tmpl b/web/templates/monitoring/systems.tmpl similarity index 100% rename from templates/monitoring/systems.tmpl rename to web/templates/monitoring/systems.tmpl diff --git a/templates/monitoring/taglist.tmpl b/web/templates/monitoring/taglist.tmpl similarity index 100% rename from templates/monitoring/taglist.tmpl rename to web/templates/monitoring/taglist.tmpl diff --git a/templates/monitoring/user.tmpl b/web/templates/monitoring/user.tmpl similarity index 100% rename from templates/monitoring/user.tmpl rename to web/templates/monitoring/user.tmpl diff --git a/templates/privacy.tmpl b/web/templates/privacy.tmpl similarity index 100% rename from templates/privacy.tmpl rename to web/templates/privacy.tmpl