mirror of
https://github.com/ClusterCockpit/cc-backend
synced 2025-09-07 01:03:00 +02:00
Merge branch 'master' into 97_107_mark_and_show_shared
This commit is contained in:
416
internal/api/api_test.go
Normal file
416
internal/api/api_test.go
Normal file
@@ -0,0 +1,416 @@
|
||||
// Copyright (C) 2022 NHR@FAU, University Erlangen-Nuremberg.
|
||||
// All rights reserved.
|
||||
// Use of this source code is governed by a MIT-style
|
||||
// license that can be found in the LICENSE file.
|
||||
package api_test
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"reflect"
|
||||
"strconv"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"github.com/ClusterCockpit/cc-backend/internal/api"
|
||||
"github.com/ClusterCockpit/cc-backend/internal/config"
|
||||
"github.com/ClusterCockpit/cc-backend/internal/graph"
|
||||
"github.com/ClusterCockpit/cc-backend/internal/metricdata"
|
||||
"github.com/ClusterCockpit/cc-backend/internal/repository"
|
||||
"github.com/ClusterCockpit/cc-backend/pkg/archive"
|
||||
"github.com/ClusterCockpit/cc-backend/pkg/log"
|
||||
"github.com/ClusterCockpit/cc-backend/pkg/schema"
|
||||
"github.com/gorilla/mux"
|
||||
|
||||
_ "github.com/mattn/go-sqlite3"
|
||||
)
|
||||
|
||||
func setup(t *testing.T) *api.RestApi {
|
||||
const testconfig = `{
|
||||
"addr": "0.0.0.0:8080",
|
||||
"validate": false,
|
||||
"archive": {
|
||||
"kind": "file",
|
||||
"path": "./var/job-archive"
|
||||
},
|
||||
"clusters": [
|
||||
{
|
||||
"name": "testcluster",
|
||||
"metricDataRepository": {"kind": "test", "url": "bla:8081"},
|
||||
"filterRanges": {
|
||||
"numNodes": { "from": 1, "to": 64 },
|
||||
"duration": { "from": 0, "to": 86400 },
|
||||
"startTime": { "from": "2022-01-01T00:00:00Z", "to": null }
|
||||
}
|
||||
}
|
||||
]
|
||||
}`
|
||||
const testclusterJson = `{
|
||||
"name": "testcluster",
|
||||
"subClusters": [
|
||||
{
|
||||
"name": "sc1",
|
||||
"nodes": "host123,host124,host125",
|
||||
"processorType": "Intel Core i7-4770",
|
||||
"socketsPerNode": 1,
|
||||
"coresPerSocket": 4,
|
||||
"threadsPerCore": 2,
|
||||
"flopRateScalar": {
|
||||
"unit": {
|
||||
"prefix": "G",
|
||||
"base": "F/s"
|
||||
},
|
||||
"value": 14
|
||||
},
|
||||
"flopRateSimd": {
|
||||
"unit": {
|
||||
"prefix": "G",
|
||||
"base": "F/s"
|
||||
},
|
||||
"value": 112
|
||||
},
|
||||
"memoryBandwidth": {
|
||||
"unit": {
|
||||
"prefix": "G",
|
||||
"base": "B/s"
|
||||
},
|
||||
"value": 24
|
||||
},
|
||||
"numberOfNodes": 70,
|
||||
"topology": {
|
||||
"node": [0, 1, 2, 3, 4, 5, 6, 7],
|
||||
"socket": [[0, 1, 2, 3, 4, 5, 6, 7]],
|
||||
"memoryDomain": [[0, 1, 2, 3, 4, 5, 6, 7]],
|
||||
"die": [[0, 1, 2, 3, 4, 5, 6, 7]],
|
||||
"core": [[0], [1], [2], [3], [4], [5], [6], [7]]
|
||||
}
|
||||
}
|
||||
],
|
||||
"metricConfig": [
|
||||
{
|
||||
"name": "load_one",
|
||||
"unit": { "base": ""},
|
||||
"scope": "node",
|
||||
"timestep": 60,
|
||||
"aggregation": "avg",
|
||||
"peak": 8,
|
||||
"normal": 0,
|
||||
"caution": 0,
|
||||
"alert": 0
|
||||
}
|
||||
]
|
||||
}`
|
||||
|
||||
log.Init("info", true)
|
||||
tmpdir := t.TempDir()
|
||||
jobarchive := filepath.Join(tmpdir, "job-archive")
|
||||
if err := os.Mkdir(jobarchive, 0777); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if err := os.WriteFile(filepath.Join(jobarchive, "version.txt"), []byte(fmt.Sprintf("%d", 1)), 0666); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if err := os.Mkdir(filepath.Join(jobarchive, "testcluster"), 0777); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if err := os.WriteFile(filepath.Join(jobarchive, "testcluster", "cluster.json"), []byte(testclusterJson), 0666); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
dbfilepath := filepath.Join(tmpdir, "test.db")
|
||||
err := repository.MigrateDB("sqlite3", dbfilepath)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
cfgFilePath := filepath.Join(tmpdir, "config.json")
|
||||
if err := os.WriteFile(cfgFilePath, []byte(testconfig), 0666); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
config.Init(cfgFilePath)
|
||||
archiveCfg := fmt.Sprintf("{\"kind\": \"file\",\"path\": \"%s\"}", jobarchive)
|
||||
|
||||
repository.Connect("sqlite3", dbfilepath)
|
||||
db := repository.GetConnection()
|
||||
|
||||
if err := archive.Init(json.RawMessage(archiveCfg), config.Keys.DisableArchive); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if err := metricdata.Init(config.Keys.DisableArchive); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
jobRepo := repository.GetJobRepository()
|
||||
resolver := &graph.Resolver{DB: db.DB, Repo: jobRepo}
|
||||
|
||||
return &api.RestApi{
|
||||
JobRepository: resolver.Repo,
|
||||
Resolver: resolver,
|
||||
}
|
||||
}
|
||||
|
||||
func cleanup() {
|
||||
// TODO: Clear all caches, reset all modules, etc...
|
||||
}
|
||||
|
||||
/*
|
||||
* This function starts a job, stops it, and then reads its data from the job-archive.
|
||||
* Do not run sub-tests in parallel! Tests should not be run in parallel at all, because
|
||||
* at least `setup` modifies global state.
|
||||
*/
|
||||
func TestRestApi(t *testing.T) {
|
||||
restapi := setup(t)
|
||||
t.Cleanup(cleanup)
|
||||
|
||||
testData := schema.JobData{
|
||||
"load_one": map[schema.MetricScope]*schema.JobMetric{
|
||||
schema.MetricScopeNode: {
|
||||
Unit: schema.Unit{Base: "load"},
|
||||
Timestep: 60,
|
||||
Series: []schema.Series{
|
||||
{
|
||||
Hostname: "host123",
|
||||
Statistics: schema.MetricStatistics{Min: 0.1, Avg: 0.2, Max: 0.3},
|
||||
Data: []schema.Float{0.1, 0.1, 0.1, 0.2, 0.2, 0.2, 0.3, 0.3, 0.3},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
metricdata.TestLoadDataCallback = func(job *schema.Job, metrics []string, scopes []schema.MetricScope, ctx context.Context) (schema.JobData, error) {
|
||||
return testData, nil
|
||||
}
|
||||
|
||||
r := mux.NewRouter()
|
||||
restapi.MountRoutes(r)
|
||||
|
||||
const startJobBody string = `{
|
||||
"jobId": 123,
|
||||
"user": "testuser",
|
||||
"project": "testproj",
|
||||
"cluster": "testcluster",
|
||||
"partition": "default",
|
||||
"walltime": 3600,
|
||||
"arrayJobId": 0,
|
||||
"numNodes": 1,
|
||||
"numHwthreads": 8,
|
||||
"numAcc": 0,
|
||||
"exclusive": 1,
|
||||
"monitoringStatus": 1,
|
||||
"smt": 1,
|
||||
"tags": [{ "type": "testTagType", "name": "testTagName" }],
|
||||
"resources": [
|
||||
{
|
||||
"hostname": "host123",
|
||||
"hwthreads": [0, 1, 2, 3, 4, 5, 6, 7]
|
||||
}
|
||||
],
|
||||
"metaData": { "jobScript": "blablabla..." },
|
||||
"startTime": 123456789
|
||||
}`
|
||||
|
||||
var dbid int64
|
||||
if ok := t.Run("StartJob", func(t *testing.T) {
|
||||
req := httptest.NewRequest(http.MethodPost, "/api/jobs/start_job/", bytes.NewBuffer([]byte(startJobBody)))
|
||||
recorder := httptest.NewRecorder()
|
||||
|
||||
r.ServeHTTP(recorder, req)
|
||||
response := recorder.Result()
|
||||
if response.StatusCode != http.StatusCreated {
|
||||
t.Fatal(response.Status, recorder.Body.String())
|
||||
}
|
||||
|
||||
var res api.StartJobApiResponse
|
||||
if err := json.Unmarshal(recorder.Body.Bytes(), &res); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
job, err := restapi.Resolver.Query().Job(context.Background(), strconv.Itoa(int(res.DBID)))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
job.Tags, err = restapi.Resolver.Job().Tags(context.Background(), job)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if job.JobID != 123 ||
|
||||
job.User != "testuser" ||
|
||||
job.Project != "testproj" ||
|
||||
job.Cluster != "testcluster" ||
|
||||
job.SubCluster != "sc1" ||
|
||||
job.Partition != "default" ||
|
||||
job.Walltime != 3600 ||
|
||||
job.ArrayJobId != 0 ||
|
||||
job.NumNodes != 1 ||
|
||||
job.NumHWThreads != 8 ||
|
||||
job.NumAcc != 0 ||
|
||||
job.Exclusive != 1 ||
|
||||
job.MonitoringStatus != 1 ||
|
||||
job.SMT != 1 ||
|
||||
!reflect.DeepEqual(job.Resources, []*schema.Resource{{Hostname: "host123", HWThreads: []int{0, 1, 2, 3, 4, 5, 6, 7}}}) ||
|
||||
job.StartTime.Unix() != 123456789 {
|
||||
t.Fatalf("unexpected job properties: %#v", job)
|
||||
}
|
||||
|
||||
if len(job.Tags) != 1 || job.Tags[0].Type != "testTagType" || job.Tags[0].Name != "testTagName" {
|
||||
t.Fatalf("unexpected tags: %#v", job.Tags)
|
||||
}
|
||||
|
||||
dbid = res.DBID
|
||||
}); !ok {
|
||||
return
|
||||
}
|
||||
|
||||
const stopJobBody string = `{
|
||||
"jobId": 123,
|
||||
"startTime": 123456789,
|
||||
"cluster": "testcluster",
|
||||
|
||||
"jobState": "completed",
|
||||
"stopTime": 123457789
|
||||
}`
|
||||
|
||||
var stoppedJob *schema.Job
|
||||
if ok := t.Run("StopJob", func(t *testing.T) {
|
||||
req := httptest.NewRequest(http.MethodPost, "/api/jobs/stop_job/", bytes.NewBuffer([]byte(stopJobBody)))
|
||||
recorder := httptest.NewRecorder()
|
||||
|
||||
r.ServeHTTP(recorder, req)
|
||||
response := recorder.Result()
|
||||
if response.StatusCode != http.StatusOK {
|
||||
t.Fatal(response.Status, recorder.Body.String())
|
||||
}
|
||||
|
||||
restapi.JobRepository.WaitForArchiving()
|
||||
job, err := restapi.Resolver.Query().Job(context.Background(), strconv.Itoa(int(dbid)))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if job.State != schema.JobStateCompleted {
|
||||
t.Fatal("expected job to be completed")
|
||||
}
|
||||
|
||||
if job.Duration != (123457789 - 123456789) {
|
||||
t.Fatalf("unexpected job properties: %#v", job)
|
||||
}
|
||||
|
||||
job.MetaData, err = restapi.JobRepository.FetchMetadata(job)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if !reflect.DeepEqual(job.MetaData, map[string]string{"jobScript": "blablabla..."}) {
|
||||
t.Fatalf("unexpected job.metaData: %#v", job.MetaData)
|
||||
}
|
||||
|
||||
stoppedJob = job
|
||||
}); !ok {
|
||||
return
|
||||
}
|
||||
|
||||
t.Run("CheckArchive", func(t *testing.T) {
|
||||
data, err := metricdata.LoadData(stoppedJob, []string{"load_one"}, []schema.MetricScope{schema.MetricScopeNode}, context.Background())
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if !reflect.DeepEqual(data, testData) {
|
||||
t.Fatal("unexpected data fetched from archive")
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("CheckDoubleStart", func(t *testing.T) {
|
||||
// Starting a job with the same jobId and cluster should only be allowed if the startTime is far appart!
|
||||
body := strings.Replace(startJobBody, `"startTime": 123456789`, `"startTime": 123456790`, -1)
|
||||
|
||||
req := httptest.NewRequest(http.MethodPost, "/api/jobs/start_job/", bytes.NewBuffer([]byte(body)))
|
||||
recorder := httptest.NewRecorder()
|
||||
|
||||
r.ServeHTTP(recorder, req)
|
||||
response := recorder.Result()
|
||||
if response.StatusCode != http.StatusUnprocessableEntity {
|
||||
t.Fatal(response.Status, recorder.Body.String())
|
||||
}
|
||||
})
|
||||
|
||||
const startJobBodyFailed string = `{
|
||||
"jobId": 12345,
|
||||
"user": "testuser",
|
||||
"project": "testproj",
|
||||
"cluster": "testcluster",
|
||||
"partition": "default",
|
||||
"walltime": 3600,
|
||||
"numNodes": 1,
|
||||
"exclusive": 1,
|
||||
"monitoringStatus": 1,
|
||||
"smt": 1,
|
||||
"resources": [
|
||||
{
|
||||
"hostname": "host123"
|
||||
}
|
||||
],
|
||||
"startTime": 12345678
|
||||
}`
|
||||
|
||||
ok := t.Run("StartJobFailed", func(t *testing.T) {
|
||||
req := httptest.NewRequest(http.MethodPost, "/api/jobs/start_job/", bytes.NewBuffer([]byte(startJobBodyFailed)))
|
||||
recorder := httptest.NewRecorder()
|
||||
|
||||
r.ServeHTTP(recorder, req)
|
||||
response := recorder.Result()
|
||||
if response.StatusCode != http.StatusCreated {
|
||||
t.Fatal(response.Status, recorder.Body.String())
|
||||
}
|
||||
})
|
||||
if !ok {
|
||||
t.Fatal("subtest failed")
|
||||
}
|
||||
|
||||
const stopJobBodyFailed string = `{
|
||||
"jobId": 12345,
|
||||
"cluster": "testcluster",
|
||||
|
||||
"jobState": "failed",
|
||||
"stopTime": 12355678
|
||||
}`
|
||||
|
||||
ok = t.Run("StopJobFailed", func(t *testing.T) {
|
||||
req := httptest.NewRequest(http.MethodPost, "/api/jobs/stop_job/", bytes.NewBuffer([]byte(stopJobBodyFailed)))
|
||||
recorder := httptest.NewRecorder()
|
||||
|
||||
r.ServeHTTP(recorder, req)
|
||||
response := recorder.Result()
|
||||
if response.StatusCode != http.StatusOK {
|
||||
t.Fatal(response.Status, recorder.Body.String())
|
||||
}
|
||||
|
||||
restapi.JobRepository.WaitForArchiving()
|
||||
jobid, cluster := int64(12345), "testcluster"
|
||||
job, err := restapi.JobRepository.Find(&jobid, &cluster, nil)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if job.State != schema.JobStateFailed {
|
||||
t.Fatal("expected job to be failed")
|
||||
}
|
||||
})
|
||||
if !ok {
|
||||
t.Fatal("subtest failed")
|
||||
}
|
||||
}
|
@@ -22,6 +22,7 @@ import (
|
||||
"github.com/ClusterCockpit/cc-backend/internal/auth"
|
||||
"github.com/ClusterCockpit/cc-backend/internal/graph"
|
||||
"github.com/ClusterCockpit/cc-backend/internal/graph/model"
|
||||
"github.com/ClusterCockpit/cc-backend/internal/importer"
|
||||
"github.com/ClusterCockpit/cc-backend/internal/repository"
|
||||
"github.com/ClusterCockpit/cc-backend/pkg/archive"
|
||||
"github.com/ClusterCockpit/cc-backend/pkg/log"
|
||||
@@ -252,7 +253,7 @@ func (api *RestApi) getJobs(rw http.ResponseWriter, r *http.Request) {
|
||||
results := make([]*schema.JobMeta, 0, len(jobs))
|
||||
for _, job := range jobs {
|
||||
if withMetadata {
|
||||
if _, err := api.JobRepository.FetchMetadata(job); err != nil {
|
||||
if _, err = api.JobRepository.FetchMetadata(job); err != nil {
|
||||
handleError(err, http.StatusInternalServerError, rw)
|
||||
return
|
||||
}
|
||||
@@ -396,7 +397,7 @@ func (api *RestApi) startJob(rw http.ResponseWriter, r *http.Request) {
|
||||
if req.State == "" {
|
||||
req.State = schema.JobStateRunning
|
||||
}
|
||||
if err := repository.SanityChecks(&req.BaseJob); err != nil {
|
||||
if err := importer.SanityChecks(&req.BaseJob); err != nil {
|
||||
handleError(err, http.StatusBadRequest, rw)
|
||||
return
|
||||
}
|
||||
|
@@ -3420,9 +3420,9 @@ func (ec *executionContext) _Job_walltime(ctx context.Context, field graphql.Col
|
||||
}
|
||||
return graphql.Null
|
||||
}
|
||||
res := resTmp.(*int64)
|
||||
res := resTmp.(int64)
|
||||
fc.Result = res
|
||||
return ec.marshalNInt2ᚖint64(ctx, field.Selections, res)
|
||||
return ec.marshalNInt2int64(ctx, field.Selections, res)
|
||||
}
|
||||
|
||||
func (ec *executionContext) fieldContext_Job_walltime(ctx context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) {
|
||||
@@ -3508,9 +3508,9 @@ func (ec *executionContext) _Job_numHWThreads(ctx context.Context, field graphql
|
||||
}
|
||||
return graphql.Null
|
||||
}
|
||||
res := resTmp.(*int32)
|
||||
res := resTmp.(int32)
|
||||
fc.Result = res
|
||||
return ec.marshalNInt2ᚖint32(ctx, field.Selections, res)
|
||||
return ec.marshalNInt2int32(ctx, field.Selections, res)
|
||||
}
|
||||
|
||||
func (ec *executionContext) fieldContext_Job_numHWThreads(ctx context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) {
|
||||
@@ -3552,9 +3552,9 @@ func (ec *executionContext) _Job_numAcc(ctx context.Context, field graphql.Colle
|
||||
}
|
||||
return graphql.Null
|
||||
}
|
||||
res := resTmp.(*int32)
|
||||
res := resTmp.(int32)
|
||||
fc.Result = res
|
||||
return ec.marshalNInt2ᚖint32(ctx, field.Selections, res)
|
||||
return ec.marshalNInt2int32(ctx, field.Selections, res)
|
||||
}
|
||||
|
||||
func (ec *executionContext) fieldContext_Job_numAcc(ctx context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) {
|
||||
@@ -3596,9 +3596,9 @@ func (ec *executionContext) _Job_SMT(ctx context.Context, field graphql.Collecte
|
||||
}
|
||||
return graphql.Null
|
||||
}
|
||||
res := resTmp.(*int32)
|
||||
res := resTmp.(int32)
|
||||
fc.Result = res
|
||||
return ec.marshalNInt2ᚖint32(ctx, field.Selections, res)
|
||||
return ec.marshalNInt2int32(ctx, field.Selections, res)
|
||||
}
|
||||
|
||||
func (ec *executionContext) fieldContext_Job_SMT(ctx context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) {
|
||||
@@ -3684,9 +3684,9 @@ func (ec *executionContext) _Job_partition(ctx context.Context, field graphql.Co
|
||||
}
|
||||
return graphql.Null
|
||||
}
|
||||
res := resTmp.(*string)
|
||||
res := resTmp.(string)
|
||||
fc.Result = res
|
||||
return ec.marshalNString2ᚖstring(ctx, field.Selections, res)
|
||||
return ec.marshalNString2string(ctx, field.Selections, res)
|
||||
}
|
||||
|
||||
func (ec *executionContext) fieldContext_Job_partition(ctx context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) {
|
||||
@@ -3728,9 +3728,9 @@ func (ec *executionContext) _Job_arrayJobId(ctx context.Context, field graphql.C
|
||||
}
|
||||
return graphql.Null
|
||||
}
|
||||
res := resTmp.(*int64)
|
||||
res := resTmp.(int64)
|
||||
fc.Result = res
|
||||
return ec.marshalNInt2ᚖint64(ctx, field.Selections, res)
|
||||
return ec.marshalNInt2int64(ctx, field.Selections, res)
|
||||
}
|
||||
|
||||
func (ec *executionContext) fieldContext_Job_arrayJobId(ctx context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) {
|
||||
@@ -9034,9 +9034,9 @@ func (ec *executionContext) _Unit_prefix(ctx context.Context, field graphql.Coll
|
||||
if resTmp == nil {
|
||||
return graphql.Null
|
||||
}
|
||||
res := resTmp.(*string)
|
||||
res := resTmp.(string)
|
||||
fc.Result = res
|
||||
return ec.marshalOString2ᚖstring(ctx, field.Selections, res)
|
||||
return ec.marshalOString2string(ctx, field.Selections, res)
|
||||
}
|
||||
|
||||
func (ec *executionContext) fieldContext_Unit_prefix(ctx context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) {
|
||||
@@ -14020,48 +14020,6 @@ func (ec *executionContext) marshalNInt2ᚖint(ctx context.Context, sel ast.Sele
|
||||
return res
|
||||
}
|
||||
|
||||
func (ec *executionContext) unmarshalNInt2ᚖint32(ctx context.Context, v interface{}) (*int32, error) {
|
||||
res, err := graphql.UnmarshalInt32(v)
|
||||
return &res, graphql.ErrorOnPath(ctx, err)
|
||||
}
|
||||
|
||||
func (ec *executionContext) marshalNInt2ᚖint32(ctx context.Context, sel ast.SelectionSet, v *int32) graphql.Marshaler {
|
||||
if v == nil {
|
||||
if !graphql.HasFieldError(ctx, graphql.GetFieldContext(ctx)) {
|
||||
ec.Errorf(ctx, "the requested element is null which the schema does not allow")
|
||||
}
|
||||
return graphql.Null
|
||||
}
|
||||
res := graphql.MarshalInt32(*v)
|
||||
if res == graphql.Null {
|
||||
if !graphql.HasFieldError(ctx, graphql.GetFieldContext(ctx)) {
|
||||
ec.Errorf(ctx, "the requested element is null which the schema does not allow")
|
||||
}
|
||||
}
|
||||
return res
|
||||
}
|
||||
|
||||
func (ec *executionContext) unmarshalNInt2ᚖint64(ctx context.Context, v interface{}) (*int64, error) {
|
||||
res, err := graphql.UnmarshalInt64(v)
|
||||
return &res, graphql.ErrorOnPath(ctx, err)
|
||||
}
|
||||
|
||||
func (ec *executionContext) marshalNInt2ᚖint64(ctx context.Context, sel ast.SelectionSet, v *int64) graphql.Marshaler {
|
||||
if v == nil {
|
||||
if !graphql.HasFieldError(ctx, graphql.GetFieldContext(ctx)) {
|
||||
ec.Errorf(ctx, "the requested element is null which the schema does not allow")
|
||||
}
|
||||
return graphql.Null
|
||||
}
|
||||
res := graphql.MarshalInt64(*v)
|
||||
if res == graphql.Null {
|
||||
if !graphql.HasFieldError(ctx, graphql.GetFieldContext(ctx)) {
|
||||
ec.Errorf(ctx, "the requested element is null which the schema does not allow")
|
||||
}
|
||||
}
|
||||
return res
|
||||
}
|
||||
|
||||
func (ec *executionContext) marshalNJob2ᚕᚖgithubᚗcomᚋClusterCockpitᚋccᚑbackendᚋpkgᚋschemaᚐJobᚄ(ctx context.Context, sel ast.SelectionSet, v []*schema.Job) graphql.Marshaler {
|
||||
ret := make(graphql.Array, len(v))
|
||||
var wg sync.WaitGroup
|
||||
@@ -14684,27 +14642,6 @@ func (ec *executionContext) marshalNString2ᚕstringᚄ(ctx context.Context, sel
|
||||
return ret
|
||||
}
|
||||
|
||||
func (ec *executionContext) unmarshalNString2ᚖstring(ctx context.Context, v interface{}) (*string, error) {
|
||||
res, err := graphql.UnmarshalString(v)
|
||||
return &res, graphql.ErrorOnPath(ctx, err)
|
||||
}
|
||||
|
||||
func (ec *executionContext) marshalNString2ᚖstring(ctx context.Context, sel ast.SelectionSet, v *string) graphql.Marshaler {
|
||||
if v == nil {
|
||||
if !graphql.HasFieldError(ctx, graphql.GetFieldContext(ctx)) {
|
||||
ec.Errorf(ctx, "the requested element is null which the schema does not allow")
|
||||
}
|
||||
return graphql.Null
|
||||
}
|
||||
res := graphql.MarshalString(*v)
|
||||
if res == graphql.Null {
|
||||
if !graphql.HasFieldError(ctx, graphql.GetFieldContext(ctx)) {
|
||||
ec.Errorf(ctx, "the requested element is null which the schema does not allow")
|
||||
}
|
||||
}
|
||||
return res
|
||||
}
|
||||
|
||||
func (ec *executionContext) marshalNSubCluster2ᚕᚖgithubᚗcomᚋClusterCockpitᚋccᚑbackendᚋpkgᚋschemaᚐSubClusterᚄ(ctx context.Context, sel ast.SelectionSet, v []*schema.SubCluster) graphql.Marshaler {
|
||||
ret := make(graphql.Array, len(v))
|
||||
var wg sync.WaitGroup
|
||||
|
131
internal/importer/handleImport.go
Normal file
131
internal/importer/handleImport.go
Normal file
@@ -0,0 +1,131 @@
|
||||
// Copyright (C) 2022 NHR@FAU, University Erlangen-Nuremberg.
|
||||
// All rights reserved.
|
||||
// Use of this source code is governed by a MIT-style
|
||||
// license that can be found in the LICENSE file.
|
||||
package importer
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"os"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/ClusterCockpit/cc-backend/internal/config"
|
||||
"github.com/ClusterCockpit/cc-backend/internal/repository"
|
||||
"github.com/ClusterCockpit/cc-backend/pkg/archive"
|
||||
"github.com/ClusterCockpit/cc-backend/pkg/log"
|
||||
"github.com/ClusterCockpit/cc-backend/pkg/schema"
|
||||
)
|
||||
|
||||
// Import all jobs specified as `<path-to-meta.json>:<path-to-data.json>,...`
|
||||
func HandleImportFlag(flag string) error {
|
||||
r := repository.GetJobRepository()
|
||||
|
||||
for _, pair := range strings.Split(flag, ",") {
|
||||
files := strings.Split(pair, ":")
|
||||
if len(files) != 2 {
|
||||
return fmt.Errorf("REPOSITORY/INIT > invalid import flag format")
|
||||
}
|
||||
|
||||
raw, err := os.ReadFile(files[0])
|
||||
if err != nil {
|
||||
log.Warn("Error while reading metadata file for import")
|
||||
return err
|
||||
}
|
||||
|
||||
if config.Keys.Validate {
|
||||
if err = schema.Validate(schema.Meta, bytes.NewReader(raw)); err != nil {
|
||||
return fmt.Errorf("REPOSITORY/INIT > validate job meta: %v", err)
|
||||
}
|
||||
}
|
||||
dec := json.NewDecoder(bytes.NewReader(raw))
|
||||
dec.DisallowUnknownFields()
|
||||
jobMeta := schema.JobMeta{BaseJob: schema.JobDefaults}
|
||||
if err = dec.Decode(&jobMeta); err != nil {
|
||||
log.Warn("Error while decoding raw json metadata for import")
|
||||
return err
|
||||
}
|
||||
|
||||
raw, err = os.ReadFile(files[1])
|
||||
if err != nil {
|
||||
log.Warn("Error while reading jobdata file for import")
|
||||
return err
|
||||
}
|
||||
|
||||
if config.Keys.Validate {
|
||||
if err = schema.Validate(schema.Data, bytes.NewReader(raw)); err != nil {
|
||||
return fmt.Errorf("REPOSITORY/INIT > validate job data: %v", err)
|
||||
}
|
||||
}
|
||||
dec = json.NewDecoder(bytes.NewReader(raw))
|
||||
dec.DisallowUnknownFields()
|
||||
jobData := schema.JobData{}
|
||||
if err = dec.Decode(&jobData); err != nil {
|
||||
log.Warn("Error while decoding raw json jobdata for import")
|
||||
return err
|
||||
}
|
||||
|
||||
// checkJobData(&jobData)
|
||||
|
||||
jobMeta.MonitoringStatus = schema.MonitoringStatusArchivingSuccessful
|
||||
|
||||
// if _, err = r.Find(&jobMeta.JobID, &jobMeta.Cluster, &jobMeta.StartTime); err != sql.ErrNoRows {
|
||||
// if err != nil {
|
||||
// log.Warn("Error while finding job in jobRepository")
|
||||
// return err
|
||||
// }
|
||||
//
|
||||
// return fmt.Errorf("REPOSITORY/INIT > a job with that jobId, cluster and startTime does already exist")
|
||||
// }
|
||||
//
|
||||
job := schema.Job{
|
||||
BaseJob: jobMeta.BaseJob,
|
||||
StartTime: time.Unix(jobMeta.StartTime, 0),
|
||||
StartTimeUnix: jobMeta.StartTime,
|
||||
}
|
||||
|
||||
// TODO: Other metrics...
|
||||
job.FlopsAnyAvg = loadJobStat(&jobMeta, "flops_any")
|
||||
job.MemBwAvg = loadJobStat(&jobMeta, "mem_bw")
|
||||
job.NetBwAvg = loadJobStat(&jobMeta, "net_bw")
|
||||
job.FileBwAvg = loadJobStat(&jobMeta, "file_bw")
|
||||
job.RawResources, err = json.Marshal(job.Resources)
|
||||
if err != nil {
|
||||
log.Warn("Error while marshaling job resources")
|
||||
return err
|
||||
}
|
||||
job.RawMetaData, err = json.Marshal(job.MetaData)
|
||||
if err != nil {
|
||||
log.Warn("Error while marshaling job metadata")
|
||||
return err
|
||||
}
|
||||
|
||||
if err = SanityChecks(&job.BaseJob); err != nil {
|
||||
log.Warn("BaseJob SanityChecks failed")
|
||||
return err
|
||||
}
|
||||
|
||||
if err = archive.GetHandle().ImportJob(&jobMeta, &jobData); err != nil {
|
||||
log.Error("Error while importing job")
|
||||
return err
|
||||
}
|
||||
|
||||
id, err := r.InsertJob(&job)
|
||||
if err != nil {
|
||||
log.Warn("Error while job db insert")
|
||||
return err
|
||||
}
|
||||
|
||||
for _, tag := range job.Tags {
|
||||
if _, err := r.AddTagOrCreate(id, tag.Type, tag.Name); err != nil {
|
||||
log.Error("Error while adding or creating tag")
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
log.Infof("successfully imported a new job (jobId: %d, cluster: %s, dbid: %d)", job.JobID, job.Cluster, id)
|
||||
}
|
||||
return nil
|
||||
}
|
172
internal/importer/importer_test.go
Normal file
172
internal/importer/importer_test.go
Normal file
@@ -0,0 +1,172 @@
|
||||
// Copyright (C) 2022 NHR@FAU, University Erlangen-Nuremberg.
|
||||
// All rights reserved.
|
||||
// Use of this source code is governed by a MIT-style
|
||||
// license that can be found in the LICENSE file.
|
||||
package importer_test
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"github.com/ClusterCockpit/cc-backend/internal/config"
|
||||
"github.com/ClusterCockpit/cc-backend/internal/importer"
|
||||
"github.com/ClusterCockpit/cc-backend/internal/repository"
|
||||
"github.com/ClusterCockpit/cc-backend/pkg/archive"
|
||||
"github.com/ClusterCockpit/cc-backend/pkg/log"
|
||||
)
|
||||
|
||||
func copyFile(s string, d string) error {
|
||||
r, err := os.Open(s)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer r.Close()
|
||||
w, err := os.Create(d)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer w.Close()
|
||||
w.ReadFrom(r)
|
||||
return nil
|
||||
}
|
||||
|
||||
func setup(t *testing.T) *repository.JobRepository {
|
||||
const testconfig = `{
|
||||
"addr": "0.0.0.0:8080",
|
||||
"validate": false,
|
||||
"archive": {
|
||||
"kind": "file",
|
||||
"path": "./var/job-archive"
|
||||
},
|
||||
"clusters": [
|
||||
{
|
||||
"name": "testcluster",
|
||||
"metricDataRepository": {"kind": "test", "url": "bla:8081"},
|
||||
"filterRanges": {
|
||||
"numNodes": { "from": 1, "to": 64 },
|
||||
"duration": { "from": 0, "to": 86400 },
|
||||
"startTime": { "from": "2022-01-01T00:00:00Z", "to": null }
|
||||
}
|
||||
},
|
||||
{
|
||||
"name": "fritz",
|
||||
"metricDataRepository": {"kind": "test", "url": "bla:8081"},
|
||||
"filterRanges": {
|
||||
"numNodes": { "from": 1, "to": 944 },
|
||||
"duration": { "from": 0, "to": 86400 },
|
||||
"startTime": { "from": "2022-01-01T00:00:00Z", "to": null }
|
||||
}
|
||||
},
|
||||
{
|
||||
"name": "taurus",
|
||||
"metricDataRepository": {"kind": "test", "url": "bla:8081"},
|
||||
"filterRanges": {
|
||||
"numNodes": { "from": 1, "to": 4000 },
|
||||
"duration": { "from": 0, "to": 604800 },
|
||||
"startTime": { "from": "2010-01-01T00:00:00Z", "to": null }
|
||||
}
|
||||
}
|
||||
]}`
|
||||
|
||||
log.Init("info", true)
|
||||
tmpdir := t.TempDir()
|
||||
|
||||
jobarchive := filepath.Join(tmpdir, "job-archive")
|
||||
if err := os.Mkdir(jobarchive, 0777); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := os.WriteFile(filepath.Join(jobarchive, "version.txt"), []byte(fmt.Sprintf("%d", 1)), 0666); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
fritzArchive := filepath.Join(tmpdir, "job-archive", "fritz")
|
||||
if err := os.Mkdir(fritzArchive, 0777); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := copyFile(filepath.Join("testdata", "cluster-fritz.json"),
|
||||
filepath.Join(fritzArchive, "cluster.json")); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
dbfilepath := filepath.Join(tmpdir, "test.db")
|
||||
err := repository.MigrateDB("sqlite3", dbfilepath)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
cfgFilePath := filepath.Join(tmpdir, "config.json")
|
||||
if err := os.WriteFile(cfgFilePath, []byte(testconfig), 0666); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
config.Init(cfgFilePath)
|
||||
archiveCfg := fmt.Sprintf("{\"kind\": \"file\",\"path\": \"%s\"}", jobarchive)
|
||||
|
||||
if err := archive.Init(json.RawMessage(archiveCfg), config.Keys.DisableArchive); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
repository.Connect("sqlite3", dbfilepath)
|
||||
return repository.GetJobRepository()
|
||||
}
|
||||
|
||||
type Result struct {
|
||||
JobId int64
|
||||
Cluster string
|
||||
StartTime int64
|
||||
Duration int32
|
||||
}
|
||||
|
||||
func readResult(t *testing.T, testname string) Result {
|
||||
var r Result
|
||||
|
||||
content, err := os.ReadFile(filepath.Join("testdata",
|
||||
fmt.Sprintf("%s-golden.json", testname)))
|
||||
if err != nil {
|
||||
t.Fatal("Error when opening file: ", err)
|
||||
}
|
||||
|
||||
err = json.Unmarshal(content, &r)
|
||||
if err != nil {
|
||||
t.Fatal("Error during Unmarshal(): ", err)
|
||||
}
|
||||
|
||||
return r
|
||||
}
|
||||
|
||||
func TestHandleImportFlag(t *testing.T) {
|
||||
r := setup(t)
|
||||
|
||||
tests, err := filepath.Glob(filepath.Join("testdata", "*.input"))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
for _, path := range tests {
|
||||
_, filename := filepath.Split(path)
|
||||
str := strings.Split(strings.TrimSuffix(filename, ".input"), "-")
|
||||
testname := str[1]
|
||||
|
||||
t.Run(testname, func(t *testing.T) {
|
||||
s := fmt.Sprintf("%s:%s", filepath.Join("testdata",
|
||||
fmt.Sprintf("meta-%s.input", testname)),
|
||||
filepath.Join("testdata", fmt.Sprintf("data-%s.json", testname)))
|
||||
err := importer.HandleImportFlag(s)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
result := readResult(t, testname)
|
||||
job, err := r.Find(&result.JobId, &result.Cluster, &result.StartTime)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if job.Duration != result.Duration {
|
||||
t.Errorf("wrong duration for job\ngot: %d \nwant: %d", job.Duration, result.Duration)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
197
internal/importer/initDB.go
Normal file
197
internal/importer/initDB.go
Normal file
@@ -0,0 +1,197 @@
|
||||
// Copyright (C) 2022 NHR@FAU, University Erlangen-Nuremberg.
|
||||
// All rights reserved.
|
||||
// Use of this source code is governed by a MIT-style
|
||||
// license that can be found in the LICENSE file.
|
||||
package importer
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/ClusterCockpit/cc-backend/internal/repository"
|
||||
"github.com/ClusterCockpit/cc-backend/pkg/archive"
|
||||
"github.com/ClusterCockpit/cc-backend/pkg/log"
|
||||
"github.com/ClusterCockpit/cc-backend/pkg/schema"
|
||||
)
|
||||
|
||||
// Delete the tables "job", "tag" and "jobtag" from the database and
|
||||
// repopulate them using the jobs found in `archive`.
|
||||
func InitDB() error {
|
||||
r := repository.GetJobRepository()
|
||||
if err := r.Flush(); err != nil {
|
||||
log.Errorf("repository initDB(): %v", err)
|
||||
return err
|
||||
}
|
||||
starttime := time.Now()
|
||||
log.Print("Building job table...")
|
||||
|
||||
t, err := r.TransactionInit()
|
||||
if err != nil {
|
||||
log.Warn("Error while initializing SQL transactions")
|
||||
return err
|
||||
}
|
||||
tags := make(map[string]int64)
|
||||
|
||||
// Not using log.Print because we want the line to end with `\r` and
|
||||
// this function is only ever called when a special command line flag
|
||||
// is passed anyways.
|
||||
fmt.Printf("%d jobs inserted...\r", 0)
|
||||
|
||||
ar := archive.GetHandle()
|
||||
i := 0
|
||||
errorOccured := 0
|
||||
|
||||
for jobContainer := range ar.Iter(false) {
|
||||
|
||||
jobMeta := jobContainer.Meta
|
||||
|
||||
// Bundle 100 inserts into one transaction for better performance
|
||||
if i%100 == 0 {
|
||||
r.TransactionCommit(t)
|
||||
fmt.Printf("%d jobs inserted...\r", i)
|
||||
}
|
||||
|
||||
jobMeta.MonitoringStatus = schema.MonitoringStatusArchivingSuccessful
|
||||
job := schema.Job{
|
||||
BaseJob: jobMeta.BaseJob,
|
||||
StartTime: time.Unix(jobMeta.StartTime, 0),
|
||||
StartTimeUnix: jobMeta.StartTime,
|
||||
}
|
||||
|
||||
// TODO: Other metrics...
|
||||
job.FlopsAnyAvg = loadJobStat(jobMeta, "flops_any")
|
||||
job.MemBwAvg = loadJobStat(jobMeta, "mem_bw")
|
||||
job.NetBwAvg = loadJobStat(jobMeta, "net_bw")
|
||||
job.FileBwAvg = loadJobStat(jobMeta, "file_bw")
|
||||
|
||||
job.RawResources, err = json.Marshal(job.Resources)
|
||||
if err != nil {
|
||||
log.Errorf("repository initDB(): %v", err)
|
||||
errorOccured++
|
||||
continue
|
||||
}
|
||||
|
||||
job.RawMetaData, err = json.Marshal(job.MetaData)
|
||||
if err != nil {
|
||||
log.Errorf("repository initDB(): %v", err)
|
||||
errorOccured++
|
||||
continue
|
||||
}
|
||||
|
||||
if err := SanityChecks(&job.BaseJob); err != nil {
|
||||
log.Errorf("repository initDB(): %v", err)
|
||||
errorOccured++
|
||||
continue
|
||||
}
|
||||
|
||||
id, err := r.TransactionAdd(t, job)
|
||||
if err != nil {
|
||||
log.Errorf("repository initDB(): %v", err)
|
||||
errorOccured++
|
||||
continue
|
||||
}
|
||||
|
||||
for _, tag := range job.Tags {
|
||||
tagstr := tag.Name + ":" + tag.Type
|
||||
tagId, ok := tags[tagstr]
|
||||
if !ok {
|
||||
tagId, err = r.TransactionAddTag(t, tag)
|
||||
if err != nil {
|
||||
log.Errorf("Error adding tag: %v", err)
|
||||
errorOccured++
|
||||
continue
|
||||
}
|
||||
tags[tagstr] = tagId
|
||||
}
|
||||
|
||||
r.TransactionSetTag(t, id, tagId)
|
||||
}
|
||||
|
||||
if err == nil {
|
||||
i += 1
|
||||
}
|
||||
}
|
||||
|
||||
if errorOccured > 0 {
|
||||
log.Warnf("Error in import of %d jobs!", errorOccured)
|
||||
}
|
||||
|
||||
r.TransactionEnd(t)
|
||||
log.Printf("A total of %d jobs have been registered in %.3f seconds.\n", i, time.Since(starttime).Seconds())
|
||||
return nil
|
||||
}
|
||||
|
||||
// This function also sets the subcluster if necessary!
|
||||
func SanityChecks(job *schema.BaseJob) error {
|
||||
if c := archive.GetCluster(job.Cluster); c == nil {
|
||||
return fmt.Errorf("no such cluster: %v", job.Cluster)
|
||||
}
|
||||
if err := archive.AssignSubCluster(job); err != nil {
|
||||
log.Warn("Error while assigning subcluster to job")
|
||||
return err
|
||||
}
|
||||
if !job.State.Valid() {
|
||||
return fmt.Errorf("not a valid job state: %v", job.State)
|
||||
}
|
||||
if len(job.Resources) == 0 || len(job.User) == 0 {
|
||||
return fmt.Errorf("'resources' and 'user' should not be empty")
|
||||
}
|
||||
if job.NumAcc < 0 || job.NumHWThreads < 0 || job.NumNodes < 1 {
|
||||
return fmt.Errorf("'numNodes', 'numAcc' or 'numHWThreads' invalid")
|
||||
}
|
||||
if len(job.Resources) != int(job.NumNodes) {
|
||||
return fmt.Errorf("len(resources) does not equal numNodes (%d vs %d)", len(job.Resources), job.NumNodes)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func loadJobStat(job *schema.JobMeta, metric string) float64 {
|
||||
if stats, ok := job.Statistics[metric]; ok {
|
||||
return stats.Avg
|
||||
}
|
||||
|
||||
return 0.0
|
||||
}
|
||||
|
||||
func checkJobData(d *schema.JobData) error {
|
||||
for _, scopes := range *d {
|
||||
// var newUnit schema.Unit
|
||||
// TODO Add node scope if missing
|
||||
for _, metric := range scopes {
|
||||
if strings.Contains(metric.Unit.Base, "B/s") ||
|
||||
strings.Contains(metric.Unit.Base, "F/s") ||
|
||||
strings.Contains(metric.Unit.Base, "B") {
|
||||
|
||||
// get overall avg
|
||||
sum := 0.0
|
||||
for _, s := range metric.Series {
|
||||
sum += s.Statistics.Avg
|
||||
}
|
||||
|
||||
avg := sum / float64(len(metric.Series))
|
||||
f, p := Normalize(avg, metric.Unit.Prefix)
|
||||
|
||||
if p != metric.Unit.Prefix {
|
||||
|
||||
fmt.Printf("Convert %e", f)
|
||||
// for _, s := range metric.Series {
|
||||
// fp := schema.ConvertFloatToFloat64(s.Data)
|
||||
//
|
||||
// for i := 0; i < len(fp); i++ {
|
||||
// fp[i] *= f
|
||||
// fp[i] = math.Ceil(fp[i])
|
||||
// }
|
||||
//
|
||||
// s.Data = schema.GetFloat64ToFloat(fp)
|
||||
// }
|
||||
|
||||
metric.Unit.Prefix = p
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
58
internal/importer/normalize.go
Normal file
58
internal/importer/normalize.go
Normal file
@@ -0,0 +1,58 @@
|
||||
// Copyright (C) 2022 NHR@FAU, University Erlangen-Nuremberg.
|
||||
// All rights reserved.
|
||||
// Use of this source code is governed by a MIT-style
|
||||
// license that can be found in the LICENSE file.
|
||||
package importer
|
||||
|
||||
import (
|
||||
"math"
|
||||
|
||||
ccunits "github.com/ClusterCockpit/cc-units"
|
||||
)
|
||||
|
||||
func getNormalizationFactor(v float64) (float64, int) {
|
||||
count := 0
|
||||
scale := -3
|
||||
|
||||
if v > 1000.0 {
|
||||
for v > 1000.0 {
|
||||
v *= 1e-3
|
||||
count++
|
||||
}
|
||||
} else {
|
||||
for v < 1.0 {
|
||||
v *= 1e3
|
||||
count++
|
||||
}
|
||||
scale = 3
|
||||
}
|
||||
return math.Pow10(count * scale), count * scale
|
||||
}
|
||||
|
||||
func getExponent(p float64) int {
|
||||
count := 0
|
||||
|
||||
for p > 1.0 {
|
||||
p = p / 1000.0
|
||||
count++
|
||||
}
|
||||
|
||||
return count * 3
|
||||
}
|
||||
|
||||
func newPrefixFromFactor(op ccunits.Prefix, e int) ccunits.Prefix {
|
||||
f := float64(op)
|
||||
exp := math.Pow10(getExponent(f) - e)
|
||||
return ccunits.Prefix(exp)
|
||||
}
|
||||
|
||||
func Normalize(avg float64, p string) (float64, string) {
|
||||
f, e := getNormalizationFactor(avg)
|
||||
|
||||
if e != 0 {
|
||||
np := newPrefixFromFactor(ccunits.NewPrefix(p), e)
|
||||
return f, np.Prefix()
|
||||
}
|
||||
|
||||
return f, p
|
||||
}
|
64
internal/importer/normalize_test.go
Normal file
64
internal/importer/normalize_test.go
Normal file
@@ -0,0 +1,64 @@
|
||||
// Copyright (C) 2022 NHR@FAU, University Erlangen-Nuremberg.
|
||||
// All rights reserved.
|
||||
// Use of this source code is governed by a MIT-style
|
||||
// license that can be found in the LICENSE file.
|
||||
package importer
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
ccunits "github.com/ClusterCockpit/cc-units"
|
||||
)
|
||||
|
||||
func TestNormalizeFactor(t *testing.T) {
|
||||
// var us string
|
||||
s := []float64{2890031237, 23998994567, 389734042344, 390349424345}
|
||||
// r := []float64{3, 24, 390, 391}
|
||||
|
||||
total := 0.0
|
||||
for _, number := range s {
|
||||
total += number
|
||||
}
|
||||
avg := total / float64(len(s))
|
||||
|
||||
fmt.Printf("AVG: %e\n", avg)
|
||||
f, e := getNormalizationFactor(avg)
|
||||
|
||||
fmt.Printf("Factor %e Count %d\n", f, e)
|
||||
|
||||
np := ccunits.NewPrefix("")
|
||||
|
||||
fmt.Printf("Prefix %e Short %s\n", float64(np), np.Prefix())
|
||||
|
||||
p := newPrefixFromFactor(np, e)
|
||||
|
||||
if p.Prefix() != "G" {
|
||||
t.Errorf("Failed Prefix or unit: Want G, Got %s", p.Prefix())
|
||||
}
|
||||
}
|
||||
|
||||
func TestNormalizeKeep(t *testing.T) {
|
||||
s := []float64{3.0, 24.0, 390.0, 391.0}
|
||||
|
||||
total := 0.0
|
||||
for _, number := range s {
|
||||
total += number
|
||||
}
|
||||
avg := total / float64(len(s))
|
||||
|
||||
fmt.Printf("AVG: %e\n", avg)
|
||||
f, e := getNormalizationFactor(avg)
|
||||
|
||||
fmt.Printf("Factor %e Count %d\n", f, e)
|
||||
|
||||
np := ccunits.NewPrefix("G")
|
||||
|
||||
fmt.Printf("Prefix %e Short %s\n", float64(np), np.Prefix())
|
||||
|
||||
p := newPrefixFromFactor(np, e)
|
||||
|
||||
if p.Prefix() != "G" {
|
||||
t.Errorf("Failed Prefix or unit: Want G, Got %s", p.Prefix())
|
||||
}
|
||||
}
|
746
internal/importer/testdata/cluster-fritz.json
vendored
Normal file
746
internal/importer/testdata/cluster-fritz.json
vendored
Normal file
@@ -0,0 +1,746 @@
|
||||
{
|
||||
"name": "fritz",
|
||||
"metricConfig": [
|
||||
{
|
||||
"name": "cpu_load",
|
||||
"unit": {
|
||||
"base": ""
|
||||
},
|
||||
"scope": "node",
|
||||
"aggregation": "avg",
|
||||
"timestep": 60,
|
||||
"peak": 72,
|
||||
"normal": 72,
|
||||
"caution": 36,
|
||||
"alert": 20
|
||||
},
|
||||
{
|
||||
"name": "cpu_user",
|
||||
"unit": {
|
||||
"base": ""
|
||||
},
|
||||
"scope": "hwthread",
|
||||
"aggregation": "avg",
|
||||
"timestep": 60,
|
||||
"peak": 100,
|
||||
"normal": 50,
|
||||
"caution": 20,
|
||||
"alert": 10
|
||||
},
|
||||
{
|
||||
"name": "mem_used",
|
||||
"unit": {
|
||||
"base": "B",
|
||||
"prefix": "G"
|
||||
},
|
||||
"scope": "node",
|
||||
"aggregation": "sum",
|
||||
"timestep": 60,
|
||||
"peak": 256,
|
||||
"normal": 128,
|
||||
"caution": 200,
|
||||
"alert": 240
|
||||
},
|
||||
{
|
||||
"name": "flops_any",
|
||||
"unit": {
|
||||
"base": "F/s",
|
||||
"prefix": "G"
|
||||
},
|
||||
"scope": "hwthread",
|
||||
"aggregation": "sum",
|
||||
"timestep": 60,
|
||||
"peak": 5600,
|
||||
"normal": 1000,
|
||||
"caution": 200,
|
||||
"alert": 50
|
||||
},
|
||||
{
|
||||
"name": "flops_sp",
|
||||
"unit": {
|
||||
"base": "F/s",
|
||||
"prefix": "G"
|
||||
},
|
||||
"scope": "hwthread",
|
||||
"aggregation": "sum",
|
||||
"timestep": 60,
|
||||
"peak": 5600,
|
||||
"normal": 1000,
|
||||
"caution": 200,
|
||||
"alert": 50
|
||||
},
|
||||
{
|
||||
"name": "flops_dp",
|
||||
"unit": {
|
||||
"base": "F/s",
|
||||
"prefix": "G"
|
||||
},
|
||||
"scope": "hwthread",
|
||||
"aggregation": "sum",
|
||||
"timestep": 60,
|
||||
"peak": 2300,
|
||||
"normal": 500,
|
||||
"caution": 100,
|
||||
"alert": 50
|
||||
},
|
||||
{
|
||||
"name": "mem_bw",
|
||||
"unit": {
|
||||
"base": "B/s",
|
||||
"prefix": "G"
|
||||
},
|
||||
"scope": "socket",
|
||||
"aggregation": "sum",
|
||||
"timestep": 60,
|
||||
"peak": 350,
|
||||
"normal": 100,
|
||||
"caution": 50,
|
||||
"alert": 10
|
||||
},
|
||||
{
|
||||
"name": "clock",
|
||||
"unit": {
|
||||
"base": "Hz",
|
||||
"prefix": "M"
|
||||
},
|
||||
"scope": "hwthread",
|
||||
"aggregation": "avg",
|
||||
"timestep": 60,
|
||||
"peak": 3000,
|
||||
"normal": 2400,
|
||||
"caution": 1800,
|
||||
"alert": 1200
|
||||
},
|
||||
{
|
||||
"name": "cpu_power",
|
||||
"unit": {
|
||||
"base": "W"
|
||||
},
|
||||
"scope": "socket",
|
||||
"aggregation": "sum",
|
||||
"timestep": 60,
|
||||
"peak": 500,
|
||||
"normal": 250,
|
||||
"caution": 100,
|
||||
"alert": 50
|
||||
},
|
||||
{
|
||||
"name": "mem_power",
|
||||
"unit": {
|
||||
"base": "W"
|
||||
},
|
||||
"scope": "socket",
|
||||
"aggregation": "sum",
|
||||
"timestep": 60,
|
||||
"peak": 100,
|
||||
"normal": 50,
|
||||
"caution": 20,
|
||||
"alert": 10
|
||||
},
|
||||
{
|
||||
"name": "ipc",
|
||||
"unit": {
|
||||
"base": "IPC"
|
||||
},
|
||||
"scope": "hwthread",
|
||||
"aggregation": "avg",
|
||||
"timestep": 60,
|
||||
"peak": 4,
|
||||
"normal": 2,
|
||||
"caution": 1,
|
||||
"alert": 0.5
|
||||
},
|
||||
{
|
||||
"name": "vectorization_ratio",
|
||||
"unit": {
|
||||
"base": ""
|
||||
},
|
||||
"scope": "hwthread",
|
||||
"aggregation": "avg",
|
||||
"timestep": 60,
|
||||
"peak": 100,
|
||||
"normal": 60,
|
||||
"caution": 40,
|
||||
"alert": 10
|
||||
},
|
||||
{
|
||||
"name": "ib_recv",
|
||||
"unit": {
|
||||
"base": "B/s"
|
||||
},
|
||||
"scope": "node",
|
||||
"aggregation": "sum",
|
||||
"timestep": 60,
|
||||
"peak": 1250000,
|
||||
"normal": 6000000,
|
||||
"caution": 200,
|
||||
"alert": 1
|
||||
},
|
||||
{
|
||||
"name": "ib_xmit",
|
||||
"unit": {
|
||||
"base": "B/s"
|
||||
},
|
||||
"scope": "node",
|
||||
"aggregation": "sum",
|
||||
"timestep": 60,
|
||||
"peak": 1250000,
|
||||
"normal": 6000000,
|
||||
"caution": 200,
|
||||
"alert": 1
|
||||
},
|
||||
{
|
||||
"name": "ib_recv_pkts",
|
||||
"unit": {
|
||||
"base": ""
|
||||
},
|
||||
"scope": "node",
|
||||
"aggregation": "sum",
|
||||
"timestep": 60,
|
||||
"peak": 6,
|
||||
"normal": 4,
|
||||
"caution": 2,
|
||||
"alert": 1
|
||||
},
|
||||
{
|
||||
"name": "ib_xmit_pkts",
|
||||
"unit": {
|
||||
"base": ""
|
||||
},
|
||||
"scope": "node",
|
||||
"aggregation": "sum",
|
||||
"timestep": 60,
|
||||
"peak": 6,
|
||||
"normal": 4,
|
||||
"caution": 2,
|
||||
"alert": 1
|
||||
},
|
||||
{
|
||||
"name": "nfs4_read",
|
||||
"unit": {
|
||||
"base": "B/s",
|
||||
"prefix": "M"
|
||||
},
|
||||
"scope": "node",
|
||||
"aggregation": "sum",
|
||||
"timestep": 60,
|
||||
"peak": 6,
|
||||
"normal": 4,
|
||||
"caution": 2,
|
||||
"alert": 1
|
||||
},
|
||||
{
|
||||
"name": "nfs4_write",
|
||||
"unit": {
|
||||
"base": "B/s",
|
||||
"prefix": "M"
|
||||
},
|
||||
"scope": "node",
|
||||
"aggregation": "sum",
|
||||
"timestep": 60,
|
||||
"peak": 6,
|
||||
"normal": 4,
|
||||
"caution": 2,
|
||||
"alert": 1
|
||||
},
|
||||
{
|
||||
"name": "nfs4_total",
|
||||
"unit": {
|
||||
"base": "B/s",
|
||||
"prefix": "M"
|
||||
},
|
||||
"scope": "node",
|
||||
"aggregation": "sum",
|
||||
"timestep": 60,
|
||||
"peak": 6,
|
||||
"normal": 4,
|
||||
"caution": 2,
|
||||
"alert": 1
|
||||
}
|
||||
],
|
||||
"subClusters": [
|
||||
{
|
||||
"name": "main",
|
||||
"nodes": "f01[01-88],f02[01-88],f03[01-88],f03[01-88],f04[01-88],f05[01-88],f06[01-88],f07[01-88],f08[01-88],f09[01-88],f10[01-88],f11[01-56],f12[01-56]",
|
||||
"processorType": "Intel Icelake",
|
||||
"socketsPerNode": 2,
|
||||
"coresPerSocket": 36,
|
||||
"threadsPerCore": 1,
|
||||
"flopRateScalar": {
|
||||
"unit": {
|
||||
"base": "F/s",
|
||||
"prefix": "G"
|
||||
},
|
||||
"value": 432
|
||||
},
|
||||
"flopRateSimd": {
|
||||
"unit": {
|
||||
"base": "F/s",
|
||||
"prefix": "G"
|
||||
},
|
||||
"value": 9216
|
||||
},
|
||||
"memoryBandwidth": {
|
||||
"unit": {
|
||||
"base": "B/s",
|
||||
"prefix": "G"
|
||||
},
|
||||
"value": 350
|
||||
},
|
||||
"topology": {
|
||||
"node": [
|
||||
0,
|
||||
1,
|
||||
2,
|
||||
3,
|
||||
4,
|
||||
5,
|
||||
6,
|
||||
7,
|
||||
8,
|
||||
9,
|
||||
10,
|
||||
11,
|
||||
12,
|
||||
13,
|
||||
14,
|
||||
15,
|
||||
16,
|
||||
17,
|
||||
18,
|
||||
19,
|
||||
20,
|
||||
21,
|
||||
22,
|
||||
23,
|
||||
24,
|
||||
25,
|
||||
26,
|
||||
27,
|
||||
28,
|
||||
29,
|
||||
30,
|
||||
31,
|
||||
32,
|
||||
33,
|
||||
34,
|
||||
35,
|
||||
36,
|
||||
37,
|
||||
38,
|
||||
39,
|
||||
40,
|
||||
41,
|
||||
42,
|
||||
43,
|
||||
44,
|
||||
45,
|
||||
46,
|
||||
47,
|
||||
48,
|
||||
49,
|
||||
50,
|
||||
51,
|
||||
52,
|
||||
53,
|
||||
54,
|
||||
55,
|
||||
56,
|
||||
57,
|
||||
58,
|
||||
59,
|
||||
60,
|
||||
61,
|
||||
62,
|
||||
63,
|
||||
64,
|
||||
65,
|
||||
66,
|
||||
67,
|
||||
68,
|
||||
69,
|
||||
70,
|
||||
71
|
||||
],
|
||||
"socket": [
|
||||
[
|
||||
0,
|
||||
1,
|
||||
2,
|
||||
3,
|
||||
4,
|
||||
5,
|
||||
6,
|
||||
7,
|
||||
8,
|
||||
9,
|
||||
10,
|
||||
11,
|
||||
12,
|
||||
13,
|
||||
14,
|
||||
15,
|
||||
16,
|
||||
17,
|
||||
18,
|
||||
19,
|
||||
20,
|
||||
21,
|
||||
22,
|
||||
23,
|
||||
24,
|
||||
25,
|
||||
26,
|
||||
27,
|
||||
28,
|
||||
29,
|
||||
30,
|
||||
31,
|
||||
32,
|
||||
33,
|
||||
34,
|
||||
35
|
||||
],
|
||||
[
|
||||
36,
|
||||
37,
|
||||
38,
|
||||
39,
|
||||
40,
|
||||
41,
|
||||
42,
|
||||
43,
|
||||
44,
|
||||
45,
|
||||
46,
|
||||
47,
|
||||
48,
|
||||
49,
|
||||
50,
|
||||
51,
|
||||
52,
|
||||
53,
|
||||
54,
|
||||
55,
|
||||
56,
|
||||
57,
|
||||
58,
|
||||
59,
|
||||
60,
|
||||
61,
|
||||
62,
|
||||
63,
|
||||
64,
|
||||
65,
|
||||
66,
|
||||
67,
|
||||
68,
|
||||
69,
|
||||
70,
|
||||
71
|
||||
]
|
||||
],
|
||||
"memoryDomain": [
|
||||
[
|
||||
0,
|
||||
1,
|
||||
2,
|
||||
3,
|
||||
4,
|
||||
5,
|
||||
6,
|
||||
7,
|
||||
8,
|
||||
9,
|
||||
10,
|
||||
11,
|
||||
12,
|
||||
13,
|
||||
14,
|
||||
15,
|
||||
16,
|
||||
17
|
||||
],
|
||||
[
|
||||
18,
|
||||
19,
|
||||
20,
|
||||
21,
|
||||
22,
|
||||
23,
|
||||
24,
|
||||
25,
|
||||
26,
|
||||
27,
|
||||
28,
|
||||
29,
|
||||
30,
|
||||
31,
|
||||
32,
|
||||
33,
|
||||
34,
|
||||
35
|
||||
],
|
||||
[
|
||||
36,
|
||||
37,
|
||||
38,
|
||||
39,
|
||||
40,
|
||||
41,
|
||||
42,
|
||||
43,
|
||||
44,
|
||||
45,
|
||||
46,
|
||||
47,
|
||||
48,
|
||||
49,
|
||||
50,
|
||||
51,
|
||||
52,
|
||||
53
|
||||
],
|
||||
[
|
||||
54,
|
||||
55,
|
||||
56,
|
||||
57,
|
||||
58,
|
||||
59,
|
||||
60,
|
||||
61,
|
||||
62,
|
||||
63,
|
||||
64,
|
||||
65,
|
||||
66,
|
||||
67,
|
||||
68,
|
||||
69,
|
||||
70,
|
||||
71
|
||||
]
|
||||
],
|
||||
"core": [
|
||||
[
|
||||
0
|
||||
],
|
||||
[
|
||||
1
|
||||
],
|
||||
[
|
||||
2
|
||||
],
|
||||
[
|
||||
3
|
||||
],
|
||||
[
|
||||
4
|
||||
],
|
||||
[
|
||||
5
|
||||
],
|
||||
[
|
||||
6
|
||||
],
|
||||
[
|
||||
7
|
||||
],
|
||||
[
|
||||
8
|
||||
],
|
||||
[
|
||||
9
|
||||
],
|
||||
[
|
||||
10
|
||||
],
|
||||
[
|
||||
11
|
||||
],
|
||||
[
|
||||
12
|
||||
],
|
||||
[
|
||||
13
|
||||
],
|
||||
[
|
||||
14
|
||||
],
|
||||
[
|
||||
15
|
||||
],
|
||||
[
|
||||
16
|
||||
],
|
||||
[
|
||||
17
|
||||
],
|
||||
[
|
||||
18
|
||||
],
|
||||
[
|
||||
19
|
||||
],
|
||||
[
|
||||
20
|
||||
],
|
||||
[
|
||||
21
|
||||
],
|
||||
[
|
||||
22
|
||||
],
|
||||
[
|
||||
23
|
||||
],
|
||||
[
|
||||
24
|
||||
],
|
||||
[
|
||||
25
|
||||
],
|
||||
[
|
||||
26
|
||||
],
|
||||
[
|
||||
27
|
||||
],
|
||||
[
|
||||
28
|
||||
],
|
||||
[
|
||||
29
|
||||
],
|
||||
[
|
||||
30
|
||||
],
|
||||
[
|
||||
31
|
||||
],
|
||||
[
|
||||
32
|
||||
],
|
||||
[
|
||||
33
|
||||
],
|
||||
[
|
||||
34
|
||||
],
|
||||
[
|
||||
35
|
||||
],
|
||||
[
|
||||
36
|
||||
],
|
||||
[
|
||||
37
|
||||
],
|
||||
[
|
||||
38
|
||||
],
|
||||
[
|
||||
39
|
||||
],
|
||||
[
|
||||
40
|
||||
],
|
||||
[
|
||||
41
|
||||
],
|
||||
[
|
||||
42
|
||||
],
|
||||
[
|
||||
43
|
||||
],
|
||||
[
|
||||
44
|
||||
],
|
||||
[
|
||||
45
|
||||
],
|
||||
[
|
||||
46
|
||||
],
|
||||
[
|
||||
47
|
||||
],
|
||||
[
|
||||
48
|
||||
],
|
||||
[
|
||||
49
|
||||
],
|
||||
[
|
||||
50
|
||||
],
|
||||
[
|
||||
51
|
||||
],
|
||||
[
|
||||
52
|
||||
],
|
||||
[
|
||||
53
|
||||
],
|
||||
[
|
||||
54
|
||||
],
|
||||
[
|
||||
55
|
||||
],
|
||||
[
|
||||
56
|
||||
],
|
||||
[
|
||||
57
|
||||
],
|
||||
[
|
||||
58
|
||||
],
|
||||
[
|
||||
59
|
||||
],
|
||||
[
|
||||
60
|
||||
],
|
||||
[
|
||||
61
|
||||
],
|
||||
[
|
||||
62
|
||||
],
|
||||
[
|
||||
63
|
||||
],
|
||||
[
|
||||
64
|
||||
],
|
||||
[
|
||||
65
|
||||
],
|
||||
[
|
||||
66
|
||||
],
|
||||
[
|
||||
67
|
||||
],
|
||||
[
|
||||
68
|
||||
],
|
||||
[
|
||||
69
|
||||
],
|
||||
[
|
||||
70
|
||||
],
|
||||
[
|
||||
71
|
||||
]
|
||||
]
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
1
internal/importer/testdata/data-fritzError.json
vendored
Normal file
1
internal/importer/testdata/data-fritzError.json
vendored
Normal file
File diff suppressed because one or more lines are too long
1
internal/importer/testdata/data-fritzMinimal.json
vendored
Normal file
1
internal/importer/testdata/data-fritzMinimal.json
vendored
Normal file
File diff suppressed because one or more lines are too long
6
internal/importer/testdata/fritzError-golden.json
vendored
Normal file
6
internal/importer/testdata/fritzError-golden.json
vendored
Normal file
@@ -0,0 +1,6 @@
|
||||
{
|
||||
"jobId": 398955,
|
||||
"cluster": "fritz",
|
||||
"startTime": 1675956725,
|
||||
"duration": 260
|
||||
}
|
6
internal/importer/testdata/fritzMinimal-golden.json
vendored
Normal file
6
internal/importer/testdata/fritzMinimal-golden.json
vendored
Normal file
@@ -0,0 +1,6 @@
|
||||
{
|
||||
"jobId": 398764,
|
||||
"cluster": "fritz",
|
||||
"startTime": 1675954353,
|
||||
"duration": 177
|
||||
}
|
1
internal/importer/testdata/meta-fritzError.input
vendored
Normal file
1
internal/importer/testdata/meta-fritzError.input
vendored
Normal file
@@ -0,0 +1 @@
|
||||
{"jobId":398955,"user":"k106eb10","project":"k106eb","cluster":"fritz","subCluster":"main","partition":"singlenode","arrayJobId":0,"numNodes":1,"numHwthreads":72,"numAcc":0,"exclusive":1,"monitoringStatus":1,"smt":0,"jobState":"completed","duration":260,"walltime":86340,"resources":[{"hostname":"f0720"}],"metaData":{"jobName":"ams_pipeline","jobScript":"#!/bin/bash -l\n#SBATCH --job-name=ams_pipeline\n#SBATCH --time=23:59:00\n#SBATCH --partition=singlenode\n#SBATCH --ntasks=72\n#SBATCH --hint=multithread\n#SBATCH --chdir=/home/atuin/k106eb/k106eb10/ACE/Ni-Al/DFT/VASP_PBE_500_0.125_0.1_NM/AlNi/binaries/bulk/base-hcp/occ-shaken/hcp16.occ.4.shake.0/cfg/NiAl3NiAl11\n#SBATCH --export=NONE\nunset SLURM_EXPORT_ENV\nuss=$(whoami)\nfind /dev/shm/ -user $uss -type f -mmin +30 -delete\ncd \"/home/atuin/k106eb/k106eb10/ACE/Ni-Al/DFT/VASP_PBE_500_0.125_0.1_NM/AlNi/binaries/bulk/base-hcp/occ-shaken/hcp16.occ.4.shake.0/cfg/NiAl3NiAl11\"\nams_pipeline pipeline.json \u003e \"/home/atuin/k106eb/k106eb10/ACE/Ni-Al/DFT/VASP_PBE_500_0.125_0.1_NM/AlNi/binaries/bulk/base-hcp/occ-shaken/hcp16.occ.4.shake.0/cfg/NiAl3NiAl11/ams_pipeline_job.sh.out\" 2\u003e \"/home/atuin/k106eb/k106eb10/ACE/Ni-Al/DFT/VASP_PBE_500_0.125_0.1_NM/AlNi/binaries/bulk/base-hcp/occ-shaken/hcp16.occ.4.shake.0/cfg/NiAl3NiAl11/ams_pipeline_job.sh.err\"\n","slurmInfo":"\nJobId=398955 JobName=ams_pipeline\n UserId=k106eb10(210387) GroupId=80111\n Account=k106eb QOS=normal \n Requeue=False Restarts=0 BatchFlag=True \n TimeLimit=1439\n SubmitTime=2023-02-09T14:11:22\n Partition=singlenode \n NodeList=f0720\n NumNodes=1 NumCPUs=72 NumTasks=72 CPUs/Task=1\n NTasksPerNode:Socket:Core=0:None:None\n TRES_req=cpu=72,mem=250000M,node=1,billing=72\n TRES_alloc=cpu=72,node=1,billing=72\n Command=/home/atuin/k106eb/k106eb10/ACE/Ni-Al/DFT/VASP_PBE_500_0.125_0.1_NM/AlNi/binaries/bulk/base-hcp/occ-shaken/hcp16.occ.4.shake.0/cfg/NiAl3NiAl11/ams_pipeline_job.sh\n WorkDir=/home/atuin/k106eb/k106eb10/ACE/Ni-Al/DFT/VASP_PBE_500_0.125_0.1_NM/AlNi/binaries/bulk/base-hcp/occ-shaken/hcp16.occ.4.shake.0/cfg/NiAl3NiAl11\n StdErr=\n StdOut=ams_pipeline.o%j\n"},"startTime":1675956725,"statistics":{"clock":{"unit":{"base":"Hz","prefix":"M"},"avg":2335.254,"min":800.418,"max":2734.922},"cpu_load":{"unit":{"base":""},"avg":52.72,"min":34.46,"max":71.91},"cpu_power":{"unit":{"base":"W"},"avg":407.767,"min":93.932,"max":497.636},"cpu_user":{"unit":{"base":""},"avg":63.678,"min":19.872,"max":96.633},"flops_any":{"unit":{"base":"F/s","prefix":"G"},"avg":635.672,"min":0,"max":1332.874},"flops_dp":{"unit":{"base":"F/s","prefix":"G"},"avg":261.006,"min":0,"max":382.294},"flops_sp":{"unit":{"base":"F/s","prefix":"G"},"avg":113.659,"min":0,"max":568.286},"ib_recv":{"unit":{"base":"B/s"},"avg":27981.111,"min":69.4,"max":48084.589},"ib_recv_pkts":{"unit":{"base":"packets/s"},"avg":398.939,"min":0.5,"max":693.817},"ib_xmit":{"unit":{"base":"B/s"},"avg":188.513,"min":39.597,"max":724.568},"ib_xmit_pkts":{"unit":{"base":"packets/s"},"avg":0.867,"min":0.2,"max":2.933},"ipc":{"unit":{"base":"IPC"},"avg":0.944,"min":0.564,"max":1.291},"mem_bw":{"unit":{"base":"B/s","prefix":"G"},"avg":79.565,"min":0.021,"max":116.02},"mem_power":{"unit":{"base":"W"},"avg":24.692,"min":7.883,"max":31.318},"mem_used":{"unit":{"base":"B","prefix":"G"},"avg":22.566,"min":8.225,"max":27.613},"nfs4_read":{"unit":{"base":"B/s","prefix":"M"},"avg":647,"min":0,"max":1946},"nfs4_total":{"unit":{"base":"B/s","prefix":"M"},"avg":6181.6,"min":1270,"max":11411},"nfs4_write":{"unit":{"base":"B/s","prefix":"M"},"avg":22.4,"min":11,"max":29},"vectorization_ratio":{"unit":{"base":"%"},"avg":77.351,"min":0,"max":98.837}}}
|
1
internal/importer/testdata/meta-fritzMinimal.input
vendored
Normal file
1
internal/importer/testdata/meta-fritzMinimal.input
vendored
Normal file
@@ -0,0 +1 @@
|
||||
{"jobId":398764,"user":"k106eb10","project":"k106eb","cluster":"fritz","subCluster":"main","numNodes":1,"exclusive":1,"jobState":"completed","duration":177,"resources":[{"hostname":"f0649"}],"startTime":1675954353,"statistics":{"clock":{"unit":{"base":"Hz","prefix":"M"},"avg":1336.519,"min":801.564,"max":2348.215},"cpu_load":{"unit":{"base":""},"avg":31.64,"min":17.36,"max":45.54},"cpu_power":{"unit":{"base":"W"},"avg":150.018,"min":93.672,"max":261.592},"cpu_user":{"unit":{"base":""},"avg":28.518,"min":0.09,"max":57.343},"flops_any":{"unit":{"base":"F/s","prefix":"G"},"avg":45.012,"min":0,"max":135.037},"flops_dp":{"unit":{"base":"F/s","prefix":"G"},"avg":22.496,"min":0,"max":67.488},"flops_sp":{"unit":{"base":"F/s","prefix":"G"},"avg":0.02,"min":0,"max":0.061},"ib_recv":{"unit":{"base":"B/s"},"avg":14442.82,"min":219.998,"max":42581.368},"ib_recv_pkts":{"unit":{"base":"packets/s"},"avg":201.532,"min":1.25,"max":601.345},"ib_xmit":{"unit":{"base":"B/s"},"avg":282.098,"min":56.2,"max":569.363},"ib_xmit_pkts":{"unit":{"base":"packets/s"},"avg":1.228,"min":0.433,"max":2},"ipc":{"unit":{"base":"IPC"},"avg":0.77,"min":0.564,"max":0.906},"mem_bw":{"unit":{"base":"B/s","prefix":"G"},"avg":4.872,"min":0.025,"max":14.552},"mem_power":{"unit":{"base":"W"},"avg":7.725,"min":6.286,"max":10.556},"mem_used":{"unit":{"base":"B","prefix":"G"},"avg":6.162,"min":6.103,"max":6.226},"nfs4_read":{"unit":{"base":"B/s","prefix":"M"},"avg":1045.333,"min":311,"max":1525},"nfs4_total":{"unit":{"base":"B/s","prefix":"M"},"avg":6430,"min":2796,"max":11518},"nfs4_write":{"unit":{"base":"B/s","prefix":"M"},"avg":24.333,"min":0,"max":38},"vectorization_ratio":{"unit":{"base":"%"},"avg":25.528,"min":0,"max":76.585}}}
|
@@ -293,7 +293,7 @@ func (ccms *CCMetricStore) buildQueries(
|
||||
scopesLoop:
|
||||
for _, requestedScope := range scopes {
|
||||
nativeScope := mc.Scope
|
||||
if nativeScope == schema.MetricScopeAccelerator && job.NumAcc == nil {
|
||||
if nativeScope == schema.MetricScopeAccelerator && job.NumAcc == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
|
@@ -56,7 +56,10 @@ func Connect(driver string, db string) {
|
||||
}
|
||||
|
||||
dbConnInstance = &DBConnection{DB: dbHandle, Driver: driver}
|
||||
checkDBVersion(driver, dbHandle.DB)
|
||||
err = checkDBVersion(driver, dbHandle.DB)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
|
@@ -1,351 +0,0 @@
|
||||
// Copyright (C) 2022 NHR@FAU, University Erlangen-Nuremberg.
|
||||
// All rights reserved.
|
||||
// Use of this source code is governed by a MIT-style
|
||||
// license that can be found in the LICENSE file.
|
||||
package repository
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"database/sql"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"os"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/ClusterCockpit/cc-backend/internal/config"
|
||||
"github.com/ClusterCockpit/cc-backend/pkg/archive"
|
||||
"github.com/ClusterCockpit/cc-backend/pkg/log"
|
||||
"github.com/ClusterCockpit/cc-backend/pkg/schema"
|
||||
"github.com/ClusterCockpit/cc-backend/pkg/units"
|
||||
)
|
||||
|
||||
const NamedJobInsert string = `INSERT INTO job (
|
||||
job_id, user, project, cluster, subcluster, ` + "`partition`" + `, array_job_id, num_nodes, num_hwthreads, num_acc,
|
||||
exclusive, monitoring_status, smt, job_state, start_time, duration, walltime, resources, meta_data,
|
||||
mem_used_max, flops_any_avg, mem_bw_avg, load_avg, net_bw_avg, net_data_vol_total, file_bw_avg, file_data_vol_total
|
||||
) VALUES (
|
||||
:job_id, :user, :project, :cluster, :subcluster, :partition, :array_job_id, :num_nodes, :num_hwthreads, :num_acc,
|
||||
:exclusive, :monitoring_status, :smt, :job_state, :start_time, :duration, :walltime, :resources, :meta_data,
|
||||
:mem_used_max, :flops_any_avg, :mem_bw_avg, :load_avg, :net_bw_avg, :net_data_vol_total, :file_bw_avg, :file_data_vol_total
|
||||
);`
|
||||
|
||||
// Import all jobs specified as `<path-to-meta.json>:<path-to-data.json>,...`
|
||||
func HandleImportFlag(flag string) error {
|
||||
for _, pair := range strings.Split(flag, ",") {
|
||||
files := strings.Split(pair, ":")
|
||||
if len(files) != 2 {
|
||||
return fmt.Errorf("REPOSITORY/INIT > invalid import flag format")
|
||||
}
|
||||
|
||||
raw, err := os.ReadFile(files[0])
|
||||
if err != nil {
|
||||
log.Warn("Error while reading metadata file for import")
|
||||
return err
|
||||
}
|
||||
|
||||
if config.Keys.Validate {
|
||||
if err := schema.Validate(schema.Meta, bytes.NewReader(raw)); err != nil {
|
||||
return fmt.Errorf("REPOSITORY/INIT > validate job meta: %v", err)
|
||||
}
|
||||
}
|
||||
dec := json.NewDecoder(bytes.NewReader(raw))
|
||||
dec.DisallowUnknownFields()
|
||||
jobMeta := schema.JobMeta{BaseJob: schema.JobDefaults}
|
||||
if err := dec.Decode(&jobMeta); err != nil {
|
||||
log.Warn("Error while decoding raw json metadata for import")
|
||||
return err
|
||||
}
|
||||
|
||||
raw, err = os.ReadFile(files[1])
|
||||
if err != nil {
|
||||
log.Warn("Error while reading jobdata file for import")
|
||||
return err
|
||||
}
|
||||
|
||||
if config.Keys.Validate {
|
||||
if err := schema.Validate(schema.Data, bytes.NewReader(raw)); err != nil {
|
||||
return fmt.Errorf("REPOSITORY/INIT > validate job data: %v", err)
|
||||
}
|
||||
}
|
||||
dec = json.NewDecoder(bytes.NewReader(raw))
|
||||
dec.DisallowUnknownFields()
|
||||
jobData := schema.JobData{}
|
||||
if err := dec.Decode(&jobData); err != nil {
|
||||
log.Warn("Error while decoding raw json jobdata for import")
|
||||
return err
|
||||
}
|
||||
|
||||
checkJobData(&jobData)
|
||||
SanityChecks(&jobMeta.BaseJob)
|
||||
jobMeta.MonitoringStatus = schema.MonitoringStatusArchivingSuccessful
|
||||
if job, err := GetJobRepository().Find(&jobMeta.JobID, &jobMeta.Cluster, &jobMeta.StartTime); err != sql.ErrNoRows {
|
||||
if err != nil {
|
||||
log.Warn("Error while finding job in jobRepository")
|
||||
return err
|
||||
}
|
||||
|
||||
return fmt.Errorf("REPOSITORY/INIT > a job with that jobId, cluster and startTime does already exist (dbid: %d)", job.ID)
|
||||
}
|
||||
|
||||
job := schema.Job{
|
||||
BaseJob: jobMeta.BaseJob,
|
||||
StartTime: time.Unix(jobMeta.StartTime, 0),
|
||||
StartTimeUnix: jobMeta.StartTime,
|
||||
}
|
||||
|
||||
// TODO: Other metrics...
|
||||
job.FlopsAnyAvg = loadJobStat(&jobMeta, "flops_any")
|
||||
job.MemBwAvg = loadJobStat(&jobMeta, "mem_bw")
|
||||
job.NetBwAvg = loadJobStat(&jobMeta, "net_bw")
|
||||
job.FileBwAvg = loadJobStat(&jobMeta, "file_bw")
|
||||
job.RawResources, err = json.Marshal(job.Resources)
|
||||
if err != nil {
|
||||
log.Warn("Error while marshaling job resources")
|
||||
return err
|
||||
}
|
||||
job.RawMetaData, err = json.Marshal(job.MetaData)
|
||||
if err != nil {
|
||||
log.Warn("Error while marshaling job metadata")
|
||||
return err
|
||||
}
|
||||
|
||||
if err := SanityChecks(&job.BaseJob); err != nil {
|
||||
log.Warn("BaseJob SanityChecks failed")
|
||||
return err
|
||||
}
|
||||
|
||||
if err := archive.GetHandle().ImportJob(&jobMeta, &jobData); err != nil {
|
||||
log.Error("Error while importing job")
|
||||
return err
|
||||
}
|
||||
|
||||
res, err := GetConnection().DB.NamedExec(NamedJobInsert, job)
|
||||
if err != nil {
|
||||
log.Warn("Error while NamedJobInsert")
|
||||
return err
|
||||
}
|
||||
|
||||
id, err := res.LastInsertId()
|
||||
if err != nil {
|
||||
log.Warn("Error while getting last insert ID")
|
||||
return err
|
||||
}
|
||||
|
||||
for _, tag := range job.Tags {
|
||||
if _, err := GetJobRepository().AddTagOrCreate(id, tag.Type, tag.Name); err != nil {
|
||||
log.Error("Error while adding or creating tag")
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
log.Infof("successfully imported a new job (jobId: %d, cluster: %s, dbid: %d)", job.JobID, job.Cluster, id)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Delete the tables "job", "tag" and "jobtag" from the database and
|
||||
// repopulate them using the jobs found in `archive`.
|
||||
func InitDB() error {
|
||||
db := GetConnection()
|
||||
starttime := time.Now()
|
||||
log.Print("Building job table...")
|
||||
|
||||
// Inserts are bundled into transactions because in sqlite,
|
||||
// that speeds up inserts A LOT.
|
||||
tx, err := db.DB.Beginx()
|
||||
if err != nil {
|
||||
log.Warn("Error while bundling transactions")
|
||||
return err
|
||||
}
|
||||
|
||||
stmt, err := tx.PrepareNamed(NamedJobInsert)
|
||||
if err != nil {
|
||||
log.Warn("Error while preparing namedJobInsert")
|
||||
return err
|
||||
}
|
||||
tags := make(map[string]int64)
|
||||
|
||||
// Not using log.Print because we want the line to end with `\r` and
|
||||
// this function is only ever called when a special command line flag
|
||||
// is passed anyways.
|
||||
fmt.Printf("%d jobs inserted...\r", 0)
|
||||
|
||||
ar := archive.GetHandle()
|
||||
i := 0
|
||||
errorOccured := 0
|
||||
|
||||
for jobContainer := range ar.Iter(false) {
|
||||
|
||||
jobMeta := jobContainer.Meta
|
||||
|
||||
// // Bundle 100 inserts into one transaction for better performance:
|
||||
if i%10 == 0 {
|
||||
if tx != nil {
|
||||
if err := tx.Commit(); err != nil {
|
||||
log.Warn("Error while committing transactions for jobMeta")
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
tx, err = db.DB.Beginx()
|
||||
if err != nil {
|
||||
log.Warn("Error while bundling transactions for jobMeta")
|
||||
return err
|
||||
}
|
||||
|
||||
stmt = tx.NamedStmt(stmt)
|
||||
fmt.Printf("%d jobs inserted...\r", i)
|
||||
}
|
||||
|
||||
jobMeta.MonitoringStatus = schema.MonitoringStatusArchivingSuccessful
|
||||
job := schema.Job{
|
||||
BaseJob: jobMeta.BaseJob,
|
||||
StartTime: time.Unix(jobMeta.StartTime, 0),
|
||||
StartTimeUnix: jobMeta.StartTime,
|
||||
}
|
||||
|
||||
// TODO: Other metrics...
|
||||
job.FlopsAnyAvg = loadJobStat(jobMeta, "flops_any")
|
||||
job.MemBwAvg = loadJobStat(jobMeta, "mem_bw")
|
||||
job.NetBwAvg = loadJobStat(jobMeta, "net_bw")
|
||||
job.FileBwAvg = loadJobStat(jobMeta, "file_bw")
|
||||
|
||||
job.RawResources, err = json.Marshal(job.Resources)
|
||||
if err != nil {
|
||||
log.Errorf("repository initDB(): %v", err)
|
||||
errorOccured++
|
||||
continue
|
||||
}
|
||||
|
||||
job.RawMetaData, err = json.Marshal(job.MetaData)
|
||||
if err != nil {
|
||||
log.Errorf("repository initDB(): %v", err)
|
||||
errorOccured++
|
||||
continue
|
||||
}
|
||||
|
||||
if err := SanityChecks(&job.BaseJob); err != nil {
|
||||
log.Errorf("repository initDB(): %v", err)
|
||||
errorOccured++
|
||||
continue
|
||||
}
|
||||
|
||||
res, err := stmt.Exec(job)
|
||||
if err != nil {
|
||||
log.Errorf("repository initDB(): %v", err)
|
||||
errorOccured++
|
||||
continue
|
||||
}
|
||||
|
||||
id, err := res.LastInsertId()
|
||||
if err != nil {
|
||||
log.Errorf("repository initDB(): %v", err)
|
||||
errorOccured++
|
||||
continue
|
||||
}
|
||||
|
||||
for _, tag := range job.Tags {
|
||||
tagstr := tag.Name + ":" + tag.Type
|
||||
tagId, ok := tags[tagstr]
|
||||
if !ok {
|
||||
res, err := tx.Exec(`INSERT INTO tag (tag_name, tag_type) VALUES (?, ?)`, tag.Name, tag.Type)
|
||||
if err != nil {
|
||||
log.Errorf("Error while inserting tag into tag table: %v (Type %v)", tag.Name, tag.Type)
|
||||
return err
|
||||
}
|
||||
tagId, err = res.LastInsertId()
|
||||
if err != nil {
|
||||
log.Warn("Error while getting last insert ID")
|
||||
return err
|
||||
}
|
||||
tags[tagstr] = tagId
|
||||
}
|
||||
|
||||
if _, err := tx.Exec(`INSERT INTO jobtag (job_id, tag_id) VALUES (?, ?)`, id, tagId); err != nil {
|
||||
log.Errorf("Error while inserting jobtag into jobtag table: %v (TagID %v)", id, tagId)
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if err == nil {
|
||||
i += 1
|
||||
}
|
||||
}
|
||||
|
||||
if errorOccured > 0 {
|
||||
log.Warnf("Error in import of %d jobs!", errorOccured)
|
||||
}
|
||||
|
||||
if err := tx.Commit(); err != nil {
|
||||
log.Warn("Error while committing SQL transactions")
|
||||
return err
|
||||
}
|
||||
|
||||
log.Printf("A total of %d jobs have been registered in %.3f seconds.\n", i, time.Since(starttime).Seconds())
|
||||
return nil
|
||||
}
|
||||
|
||||
// This function also sets the subcluster if necessary!
|
||||
func SanityChecks(job *schema.BaseJob) error {
|
||||
if c := archive.GetCluster(job.Cluster); c == nil {
|
||||
return fmt.Errorf("no such cluster: %v", job.Cluster)
|
||||
}
|
||||
if err := archive.AssignSubCluster(job); err != nil {
|
||||
log.Warn("Error while assigning subcluster to job")
|
||||
return err
|
||||
}
|
||||
if !job.State.Valid() {
|
||||
return fmt.Errorf("not a valid job state: %v", job.State)
|
||||
}
|
||||
if len(job.Resources) == 0 || len(job.User) == 0 {
|
||||
return fmt.Errorf("'resources' and 'user' should not be empty")
|
||||
}
|
||||
if *job.NumAcc < 0 || *job.NumHWThreads < 0 || job.NumNodes < 1 {
|
||||
return fmt.Errorf("'numNodes', 'numAcc' or 'numHWThreads' invalid")
|
||||
}
|
||||
if len(job.Resources) != int(job.NumNodes) {
|
||||
return fmt.Errorf("len(resources) does not equal numNodes (%d vs %d)", len(job.Resources), job.NumNodes)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func loadJobStat(job *schema.JobMeta, metric string) float64 {
|
||||
if stats, ok := job.Statistics[metric]; ok {
|
||||
return stats.Avg
|
||||
}
|
||||
|
||||
return 0.0
|
||||
}
|
||||
|
||||
func checkJobData(d *schema.JobData) error {
|
||||
for _, scopes := range *d {
|
||||
var newUnit string
|
||||
// Add node scope if missing
|
||||
for _, metric := range scopes {
|
||||
if strings.Contains(metric.Unit.Base, "B/s") ||
|
||||
strings.Contains(metric.Unit.Base, "F/s") ||
|
||||
strings.Contains(metric.Unit.Base, "B") {
|
||||
|
||||
// First get overall avg
|
||||
sum := 0.0
|
||||
for _, s := range metric.Series {
|
||||
sum += s.Statistics.Avg
|
||||
}
|
||||
|
||||
avg := sum / float64(len(metric.Series))
|
||||
|
||||
for _, s := range metric.Series {
|
||||
fp := schema.ConvertFloatToFloat64(s.Data)
|
||||
// Normalize values with new unit prefix
|
||||
oldUnit := metric.Unit.Base
|
||||
units.NormalizeSeries(fp, avg, oldUnit, &newUnit)
|
||||
s.Data = schema.GetFloat64ToFloat(fp)
|
||||
}
|
||||
metric.Unit.Base = newUnit
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
@@ -96,6 +96,50 @@ func scanJob(row interface{ Scan(...interface{}) error }) (*schema.Job, error) {
|
||||
return job, nil
|
||||
}
|
||||
|
||||
func (r *JobRepository) Optimize() error {
|
||||
var err error
|
||||
|
||||
switch r.driver {
|
||||
case "sqlite3":
|
||||
if _, err = r.DB.Exec(`VACUUM`); err != nil {
|
||||
return err
|
||||
}
|
||||
case "mysql":
|
||||
log.Info("Optimize currently not supported for mysql driver")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *JobRepository) Flush() error {
|
||||
var err error
|
||||
|
||||
switch r.driver {
|
||||
case "sqlite3":
|
||||
if _, err = r.DB.Exec(`DELETE FROM jobtag`); err != nil {
|
||||
return err
|
||||
}
|
||||
if _, err = r.DB.Exec(`DELETE FROM tag`); err != nil {
|
||||
return err
|
||||
}
|
||||
if _, err = r.DB.Exec(`DELETE FROM job`); err != nil {
|
||||
return err
|
||||
}
|
||||
case "mysql":
|
||||
if _, err = r.DB.Exec(`TRUNCATE TABLE jobtag`); err != nil {
|
||||
return err
|
||||
}
|
||||
if _, err = r.DB.Exec(`TRUNCATE TABLE tag`); err != nil {
|
||||
return err
|
||||
}
|
||||
if _, err = r.DB.Exec(`TRUNCATE TABLE job`); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func scanJobLink(row interface{ Scan(...interface{}) error }) (*model.JobLink, error) {
|
||||
jobLink := &model.JobLink{}
|
||||
if err := row.Scan(
|
||||
@@ -548,7 +592,7 @@ func (r *JobRepository) FindUserOrProjectOrJobname(ctx context.Context, searchte
|
||||
func (r *JobRepository) FindColumnValue(user *auth.User, searchterm string, table string, selectColumn string, whereColumn string, isLike bool) (result string, err error) {
|
||||
compareStr := " = ?"
|
||||
query := searchterm
|
||||
if isLike == true {
|
||||
if isLike {
|
||||
compareStr = " LIKE ?"
|
||||
query = "%" + searchterm + "%"
|
||||
}
|
||||
@@ -689,6 +733,38 @@ func (r *JobRepository) StopJobsExceedingWalltimeBy(seconds int) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *JobRepository) FindJobsBefore(startTime int64) ([]*schema.Job, error) {
|
||||
|
||||
query := sq.Select(jobColumns...).From("job").Where(fmt.Sprintf(
|
||||
"job.start_time < %d", startTime))
|
||||
|
||||
sql, args, err := query.ToSql()
|
||||
if err != nil {
|
||||
log.Warn("Error while converting query to sql")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
log.Debugf("SQL query: `%s`, args: %#v", sql, args)
|
||||
rows, err := query.RunWith(r.stmtCache).Query()
|
||||
if err != nil {
|
||||
log.Error("Error while running query")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
jobs := make([]*schema.Job, 0, 50)
|
||||
for rows.Next() {
|
||||
job, err := scanJob(rows)
|
||||
if err != nil {
|
||||
rows.Close()
|
||||
log.Warn("Error while scanning rows")
|
||||
return nil, err
|
||||
}
|
||||
jobs = append(jobs, job)
|
||||
}
|
||||
|
||||
return jobs, nil
|
||||
}
|
||||
|
||||
// GraphQL validation should make sure that no unkown values can be specified.
|
||||
var groupBy2column = map[model.Aggregate]string{
|
||||
model.AggregateUser: "job.user",
|
||||
@@ -706,9 +782,10 @@ func (r *JobRepository) JobsStatistics(ctx context.Context,
|
||||
stats := map[string]*model.JobsStatistics{}
|
||||
var castType string
|
||||
|
||||
if r.driver == "sqlite3" {
|
||||
switch r.driver {
|
||||
case "sqlite3":
|
||||
castType = "int"
|
||||
} else if r.driver == "mysql" {
|
||||
case "mysql":
|
||||
castType = "unsigned"
|
||||
}
|
||||
|
||||
@@ -890,7 +967,6 @@ func (r *JobRepository) jobsStatisticsHistogram(ctx context.Context,
|
||||
value string, filters []*model.JobFilter, id, col string) ([]*model.HistoPoint, error) {
|
||||
|
||||
start := time.Now()
|
||||
query := sq.Select(value, "COUNT(job.id) AS count").From("job")
|
||||
query, qerr := SecurityCheck(ctx, sq.Select(value, "COUNT(job.id) AS count").From("job"))
|
||||
|
||||
if qerr != nil {
|
||||
@@ -924,3 +1000,121 @@ func (r *JobRepository) jobsStatisticsHistogram(ctx context.Context,
|
||||
log.Infof("Timer jobsStatisticsHistogram %s", time.Since(start))
|
||||
return points, nil
|
||||
}
|
||||
|
||||
const NamedJobInsert string = `INSERT INTO job (
|
||||
job_id, user, project, cluster, subcluster, ` + "`partition`" + `, array_job_id, num_nodes, num_hwthreads, num_acc,
|
||||
exclusive, monitoring_status, smt, job_state, start_time, duration, walltime, resources, meta_data,
|
||||
mem_used_max, flops_any_avg, mem_bw_avg, load_avg, net_bw_avg, net_data_vol_total, file_bw_avg, file_data_vol_total
|
||||
) VALUES (
|
||||
:job_id, :user, :project, :cluster, :subcluster, :partition, :array_job_id, :num_nodes, :num_hwthreads, :num_acc,
|
||||
:exclusive, :monitoring_status, :smt, :job_state, :start_time, :duration, :walltime, :resources, :meta_data,
|
||||
:mem_used_max, :flops_any_avg, :mem_bw_avg, :load_avg, :net_bw_avg, :net_data_vol_total, :file_bw_avg, :file_data_vol_total
|
||||
);`
|
||||
|
||||
func (r *JobRepository) InsertJob(job *schema.Job) (int64, error) {
|
||||
res, err := r.DB.NamedExec(NamedJobInsert, job)
|
||||
if err != nil {
|
||||
log.Warn("Error while NamedJobInsert")
|
||||
return 0, err
|
||||
}
|
||||
id, err := res.LastInsertId()
|
||||
if err != nil {
|
||||
log.Warn("Error while getting last insert ID")
|
||||
return 0, err
|
||||
}
|
||||
|
||||
return id, nil
|
||||
}
|
||||
|
||||
type Transaction struct {
|
||||
tx *sqlx.Tx
|
||||
stmt *sqlx.NamedStmt
|
||||
}
|
||||
|
||||
func (r *JobRepository) TransactionInit() (*Transaction, error) {
|
||||
var err error
|
||||
t := new(Transaction)
|
||||
// Inserts are bundled into transactions because in sqlite,
|
||||
// that speeds up inserts A LOT.
|
||||
t.tx, err = r.DB.Beginx()
|
||||
if err != nil {
|
||||
log.Warn("Error while bundling transactions")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
t.stmt, err = t.tx.PrepareNamed(NamedJobInsert)
|
||||
if err != nil {
|
||||
log.Warn("Error while preparing namedJobInsert")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return t, nil
|
||||
}
|
||||
|
||||
func (r *JobRepository) TransactionCommit(t *Transaction) error {
|
||||
var err error
|
||||
if t.tx != nil {
|
||||
if err = t.tx.Commit(); err != nil {
|
||||
log.Warn("Error while committing transactions")
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
t.tx, err = r.DB.Beginx()
|
||||
if err != nil {
|
||||
log.Warn("Error while bundling transactions")
|
||||
return err
|
||||
}
|
||||
|
||||
t.stmt = t.tx.NamedStmt(t.stmt)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *JobRepository) TransactionEnd(t *Transaction) error {
|
||||
if err := t.tx.Commit(); err != nil {
|
||||
log.Warn("Error while committing SQL transactions")
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *JobRepository) TransactionAdd(t *Transaction, job schema.Job) (int64, error) {
|
||||
res, err := t.stmt.Exec(job)
|
||||
if err != nil {
|
||||
log.Errorf("repository initDB(): %v", err)
|
||||
return 0, err
|
||||
}
|
||||
|
||||
id, err := res.LastInsertId()
|
||||
if err != nil {
|
||||
log.Errorf("repository initDB(): %v", err)
|
||||
return 0, err
|
||||
}
|
||||
|
||||
return id, nil
|
||||
}
|
||||
|
||||
func (r *JobRepository) TransactionAddTag(t *Transaction, tag *schema.Tag) (int64, error) {
|
||||
res, err := t.tx.Exec(`INSERT INTO tag (tag_name, tag_type) VALUES (?, ?)`, tag.Name, tag.Type)
|
||||
if err != nil {
|
||||
log.Errorf("Error while inserting tag into tag table: %v (Type %v)", tag.Name, tag.Type)
|
||||
return 0, err
|
||||
}
|
||||
tagId, err := res.LastInsertId()
|
||||
if err != nil {
|
||||
log.Warn("Error while getting last insert ID")
|
||||
return 0, err
|
||||
}
|
||||
|
||||
return tagId, nil
|
||||
}
|
||||
|
||||
func (r *JobRepository) TransactionSetTag(t *Transaction, jobId int64, tagId int64) error {
|
||||
if _, err := t.tx.Exec(`INSERT INTO jobtag (job_id, tag_id) VALUES (?, ?)`, jobId, tagId); err != nil {
|
||||
log.Errorf("Error while inserting jobtag into jobtag table: %v (TagID %v)", jobId, tagId)
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
@@ -12,19 +12,21 @@ import (
|
||||
_ "github.com/mattn/go-sqlite3"
|
||||
)
|
||||
|
||||
func init() {
|
||||
log.Init("info", true)
|
||||
Connect("sqlite3", "../../test/test.db")
|
||||
}
|
||||
|
||||
func setup(t *testing.T) *JobRepository {
|
||||
log.Init("info", true)
|
||||
dbfilepath := "testdata/test.db"
|
||||
err := MigrateDB("sqlite3", dbfilepath)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
Connect("sqlite3", dbfilepath)
|
||||
return GetJobRepository()
|
||||
}
|
||||
|
||||
func TestFind(t *testing.T) {
|
||||
r := setup(t)
|
||||
|
||||
jobId, cluster, startTime := int64(1404396), "emmy", int64(1609299584)
|
||||
jobId, cluster, startTime := int64(398998), "fritz", int64(1675957496)
|
||||
job, err := r.Find(&jobId, &cluster, &startTime)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
@@ -32,7 +34,7 @@ func TestFind(t *testing.T) {
|
||||
|
||||
// fmt.Printf("%+v", job)
|
||||
|
||||
if job.ID != 1366 {
|
||||
if job.ID != 5 {
|
||||
t.Errorf("wrong summary for diagnostic 3\ngot: %d \nwant: 1366", job.JobID)
|
||||
}
|
||||
}
|
||||
@@ -40,14 +42,14 @@ func TestFind(t *testing.T) {
|
||||
func TestFindById(t *testing.T) {
|
||||
r := setup(t)
|
||||
|
||||
job, err := r.FindById(1366)
|
||||
job, err := r.FindById(5)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// fmt.Printf("%+v", job)
|
||||
|
||||
if job.JobID != 1404396 {
|
||||
if job.JobID != 398998 {
|
||||
t.Errorf("wrong summary for diagnostic 3\ngot: %d \nwant: 1404396", job.JobID)
|
||||
}
|
||||
}
|
||||
@@ -63,7 +65,7 @@ func TestGetTags(t *testing.T) {
|
||||
fmt.Printf("TAGS %+v \n", tags)
|
||||
// fmt.Printf("COUNTS %+v \n", counts)
|
||||
|
||||
if counts["bandwidth"] != 6 {
|
||||
t.Errorf("wrong summary for diagnostic 3\ngot: %d \nwant: 6", counts["load-imbalance"])
|
||||
if counts["bandwidth"] != 3 {
|
||||
t.Errorf("wrong tag count \ngot: %d \nwant: 3", counts["bandwidth"])
|
||||
}
|
||||
}
|
||||
|
@@ -8,7 +8,6 @@ import (
|
||||
"database/sql"
|
||||
"embed"
|
||||
"fmt"
|
||||
"os"
|
||||
|
||||
"github.com/ClusterCockpit/cc-backend/pkg/log"
|
||||
"github.com/golang-migrate/migrate/v4"
|
||||
@@ -22,37 +21,37 @@ const Version uint = 3
|
||||
//go:embed migrations/*
|
||||
var migrationFiles embed.FS
|
||||
|
||||
func checkDBVersion(backend string, db *sql.DB) {
|
||||
func checkDBVersion(backend string, db *sql.DB) error {
|
||||
var m *migrate.Migrate
|
||||
|
||||
if backend == "sqlite3" {
|
||||
|
||||
switch backend {
|
||||
case "sqlite3":
|
||||
driver, err := sqlite3.WithInstance(db, &sqlite3.Config{})
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
return err
|
||||
}
|
||||
d, err := iofs.New(migrationFiles, "migrations/sqlite3")
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
return err
|
||||
}
|
||||
|
||||
m, err = migrate.NewWithInstance("iofs", d, "sqlite3", driver)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
return err
|
||||
}
|
||||
} else if backend == "mysql" {
|
||||
case "mysql":
|
||||
driver, err := mysql.WithInstance(db, &mysql.Config{})
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
return err
|
||||
}
|
||||
d, err := iofs.New(migrationFiles, "migrations/mysql")
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
return err
|
||||
}
|
||||
|
||||
m, err = migrate.NewWithInstance("iofs", d, "mysql", driver)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
@@ -61,25 +60,26 @@ func checkDBVersion(backend string, db *sql.DB) {
|
||||
if err == migrate.ErrNilVersion {
|
||||
log.Warn("Legacy database without version or missing database file!")
|
||||
} else {
|
||||
log.Fatal(err)
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if v < Version {
|
||||
log.Warnf("Unsupported database version %d, need %d.\nPlease backup your database file and run cc-backend --migrate-db", v, Version)
|
||||
os.Exit(0)
|
||||
return fmt.Errorf("unsupported database version %d, need %d.\nPlease backup your database file and run cc-backend --migrate-db", v, Version)
|
||||
}
|
||||
|
||||
if v > Version {
|
||||
log.Warnf("Unsupported database version %d, need %d.\nPlease refer to documentation how to downgrade db with external migrate tool!", v, Version)
|
||||
os.Exit(0)
|
||||
return fmt.Errorf("unsupported database version %d, need %d.\nPlease refer to documentation how to downgrade db with external migrate tool", v, Version)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func MigrateDB(backend string, db string) {
|
||||
func MigrateDB(backend string, db string) error {
|
||||
var m *migrate.Migrate
|
||||
|
||||
if backend == "sqlite3" {
|
||||
switch backend {
|
||||
case "sqlite3":
|
||||
d, err := iofs.New(migrationFiles, "migrations/sqlite3")
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
@@ -87,17 +87,17 @@ func MigrateDB(backend string, db string) {
|
||||
|
||||
m, err = migrate.NewWithSourceInstance("iofs", d, fmt.Sprintf("sqlite3://%s?_foreign_keys=on", db))
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
return err
|
||||
}
|
||||
} else if backend == "mysql" {
|
||||
case "mysql":
|
||||
d, err := iofs.New(migrationFiles, "migrations/mysql")
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
return err
|
||||
}
|
||||
|
||||
m, err = migrate.NewWithSourceInstance("iofs", d, fmt.Sprintf("mysql://%s?multiStatements=true", db))
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
@@ -105,9 +105,10 @@ func MigrateDB(backend string, db string) {
|
||||
if err == migrate.ErrNoChange {
|
||||
log.Info("DB already up to date!")
|
||||
} else {
|
||||
log.Fatal(err)
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
m.Close()
|
||||
return nil
|
||||
}
|
||||
|
@@ -31,13 +31,15 @@ CREATE TABLE IF NOT EXISTS job (
|
||||
net_bw_avg REAL NOT NULL DEFAULT 0.0,
|
||||
net_data_vol_total REAL NOT NULL DEFAULT 0.0,
|
||||
file_bw_avg REAL NOT NULL DEFAULT 0.0,
|
||||
file_data_vol_total REAL NOT NULL DEFAULT 0.0);
|
||||
file_data_vol_total REAL NOT NULL DEFAULT 0.0,
|
||||
UNIQUE (job_id, cluster, start_time)
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS tag (
|
||||
id INTEGER PRIMARY KEY,
|
||||
tag_type VARCHAR(255) NOT NULL,
|
||||
tag_name VARCHAR(255) NOT NULL,
|
||||
CONSTRAINT be_unique UNIQUE (tag_type, tag_name));
|
||||
UNIQUE (tag_type, tag_name));
|
||||
|
||||
CREATE TABLE IF NOT EXISTS jobtag (
|
||||
job_id INTEGER,
|
||||
|
@@ -7,19 +7,19 @@ CREATE TABLE IF NOT EXISTS job (
|
||||
|
||||
user VARCHAR(255) NOT NULL,
|
||||
project VARCHAR(255) NOT NULL,
|
||||
partition VARCHAR(255) NOT NULL,
|
||||
array_job_id BIGINT NOT NULL,
|
||||
duration INT NOT NULL DEFAULT 0,
|
||||
walltime INT NOT NULL DEFAULT 0,
|
||||
job_state VARCHAR(255) NOT NULL
|
||||
partition VARCHAR(255),
|
||||
array_job_id BIGINT,
|
||||
duration INT NOT NULL,
|
||||
walltime INT NOT NULL,
|
||||
job_state VARCHAR(255) NOT NULL
|
||||
CHECK(job_state IN ('running', 'completed', 'failed', 'cancelled',
|
||||
'stopped', 'timeout', 'preempted', 'out_of_memory')),
|
||||
meta_data TEXT, -- JSON
|
||||
resources TEXT NOT NULL, -- JSON
|
||||
|
||||
num_nodes INT NOT NULL,
|
||||
num_hwthreads INT NOT NULL,
|
||||
num_acc INT NOT NULL,
|
||||
num_hwthreads INT,
|
||||
num_acc INT,
|
||||
smt TINYINT NOT NULL DEFAULT 1 CHECK(smt IN (0, 1 )),
|
||||
exclusive TINYINT NOT NULL DEFAULT 1 CHECK(exclusive IN (0, 1, 2)),
|
||||
monitoring_status TINYINT NOT NULL DEFAULT 1 CHECK(monitoring_status IN (0, 1, 2, 3)),
|
||||
@@ -31,13 +31,15 @@ CREATE TABLE IF NOT EXISTS job (
|
||||
net_bw_avg REAL NOT NULL DEFAULT 0.0,
|
||||
net_data_vol_total REAL NOT NULL DEFAULT 0.0,
|
||||
file_bw_avg REAL NOT NULL DEFAULT 0.0,
|
||||
file_data_vol_total REAL NOT NULL DEFAULT 0.0);
|
||||
file_data_vol_total REAL NOT NULL DEFAULT 0.0,
|
||||
UNIQUE (job_id, cluster, start_time)
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS tag (
|
||||
id INTEGER PRIMARY KEY,
|
||||
tag_type VARCHAR(255) NOT NULL,
|
||||
tag_name VARCHAR(255) NOT NULL,
|
||||
CONSTRAINT be_unique UNIQUE (tag_type, tag_name));
|
||||
UNIQUE (tag_type, tag_name));
|
||||
|
||||
CREATE TABLE IF NOT EXISTS jobtag (
|
||||
job_id INTEGER,
|
||||
|
@@ -34,11 +34,13 @@ func (r *JobRepository) QueryJobs(
|
||||
|
||||
if order != nil {
|
||||
field := toSnakeCase(order.Field)
|
||||
if order.Order == model.SortDirectionEnumAsc {
|
||||
|
||||
switch order.Order {
|
||||
case model.SortDirectionEnumAsc:
|
||||
query = query.OrderBy(fmt.Sprintf("job.%s ASC", field))
|
||||
} else if order.Order == model.SortDirectionEnumDesc {
|
||||
case model.SortDirectionEnumDesc:
|
||||
query = query.OrderBy(fmt.Sprintf("job.%s DESC", field))
|
||||
} else {
|
||||
default:
|
||||
return nil, errors.New("REPOSITORY/QUERY > invalid sorting order")
|
||||
}
|
||||
}
|
||||
@@ -159,7 +161,7 @@ func SecurityCheck(ctx context.Context, query sq.SelectBuilder) (queryOut sq.Sel
|
||||
return query.Where("job.user = ?", user.Username), nil
|
||||
} else { // Unauthorized : Error
|
||||
var qnil sq.SelectBuilder
|
||||
return qnil, errors.New(fmt.Sprintf("User '%s' with unknown roles! [%#v]\n", user.Username, user.Roles))
|
||||
return qnil, fmt.Errorf("user '%s' with unknown roles [%#v]", user.Username, user.Roles)
|
||||
}
|
||||
}
|
||||
|
||||
|
BIN
internal/repository/testdata/test.db
vendored
Normal file
BIN
internal/repository/testdata/test.db
vendored
Normal file
Binary file not shown.
@@ -11,12 +11,10 @@ import (
|
||||
|
||||
"github.com/ClusterCockpit/cc-backend/internal/auth"
|
||||
"github.com/ClusterCockpit/cc-backend/internal/config"
|
||||
"github.com/ClusterCockpit/cc-backend/pkg/log"
|
||||
_ "github.com/mattn/go-sqlite3"
|
||||
)
|
||||
|
||||
func init() {
|
||||
Connect("sqlite3", "../../test/test.db")
|
||||
}
|
||||
|
||||
func setupUserTest(t *testing.T) *UserCfgRepo {
|
||||
const testconfig = `{
|
||||
"addr": "0.0.0.0:8080",
|
||||
@@ -34,6 +32,15 @@ func setupUserTest(t *testing.T) *UserCfgRepo {
|
||||
"startTime": { "from": "2022-01-01T00:00:00Z", "to": null }
|
||||
} } ]
|
||||
}`
|
||||
|
||||
log.Init("info", true)
|
||||
dbfilepath := "testdata/test.db"
|
||||
err := MigrateDB("sqlite3", dbfilepath)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
Connect("sqlite3", dbfilepath)
|
||||
|
||||
tmpdir := t.TempDir()
|
||||
cfgFilePath := filepath.Join(tmpdir, "config.json")
|
||||
if err := os.WriteFile(cfgFilePath, []byte(testconfig), 0666); err != nil {
|
||||
@@ -43,9 +50,10 @@ func setupUserTest(t *testing.T) *UserCfgRepo {
|
||||
config.Init(cfgFilePath)
|
||||
return GetUserCfgRepo()
|
||||
}
|
||||
|
||||
func TestGetUIConfig(t *testing.T) {
|
||||
r := setupUserTest(t)
|
||||
u := auth.User{Username: "jan"}
|
||||
u := auth.User{Username: "demo"}
|
||||
|
||||
cfg, err := r.GetUIConfig(&u)
|
||||
if err != nil {
|
||||
@@ -53,10 +61,9 @@ func TestGetUIConfig(t *testing.T) {
|
||||
}
|
||||
|
||||
tmp := cfg["plot_list_selectedMetrics"]
|
||||
metrics := tmp.([]interface{})
|
||||
|
||||
str := metrics[2].(string)
|
||||
if str != "mem_bw" {
|
||||
metrics := tmp.([]string)
|
||||
str := metrics[2]
|
||||
if str != "mem_used" {
|
||||
t.Errorf("wrong config\ngot: %s \nwant: mem_bw", str)
|
||||
}
|
||||
}
|
||||
|
@@ -24,7 +24,7 @@ import (
|
||||
func LoadEnv(file string) error {
|
||||
f, err := os.Open(file)
|
||||
if err != nil {
|
||||
log.Error("Error while opening file")
|
||||
log.Error("Error while opening .env file")
|
||||
return err
|
||||
}
|
||||
|
||||
|
77
internal/util/compress.go
Normal file
77
internal/util/compress.go
Normal file
@@ -0,0 +1,77 @@
|
||||
// Copyright (C) 2022 NHR@FAU, University Erlangen-Nuremberg.
|
||||
// All rights reserved.
|
||||
// Use of this source code is governed by a MIT-style
|
||||
// license that can be found in the LICENSE file.
|
||||
package util
|
||||
|
||||
import (
|
||||
"compress/gzip"
|
||||
"io"
|
||||
"os"
|
||||
|
||||
"github.com/ClusterCockpit/cc-backend/pkg/log"
|
||||
)
|
||||
|
||||
func CompressFile(fileIn string, fileOut string) error {
|
||||
originalFile, err := os.Open(fileIn)
|
||||
if err != nil {
|
||||
log.Errorf("CompressFile() error: %v", err)
|
||||
return err
|
||||
}
|
||||
defer originalFile.Close()
|
||||
|
||||
gzippedFile, err := os.Create(fileOut)
|
||||
|
||||
if err != nil {
|
||||
log.Errorf("CompressFile() error: %v", err)
|
||||
return err
|
||||
}
|
||||
defer gzippedFile.Close()
|
||||
|
||||
gzipWriter := gzip.NewWriter(gzippedFile)
|
||||
defer gzipWriter.Close()
|
||||
|
||||
_, err = io.Copy(gzipWriter, originalFile)
|
||||
if err != nil {
|
||||
log.Errorf("CompressFile() error: %v", err)
|
||||
return err
|
||||
}
|
||||
gzipWriter.Flush()
|
||||
if err := os.Remove(fileIn); err != nil {
|
||||
log.Errorf("CompressFile() error: %v", err)
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func UncompressFile(fileIn string, fileOut string) error {
|
||||
gzippedFile, err := os.Open(fileIn)
|
||||
if err != nil {
|
||||
log.Errorf("UncompressFile() error: %v", err)
|
||||
return err
|
||||
}
|
||||
defer gzippedFile.Close()
|
||||
|
||||
gzipReader, _ := gzip.NewReader(gzippedFile)
|
||||
defer gzipReader.Close()
|
||||
|
||||
uncompressedFile, err := os.Create(fileOut)
|
||||
if err != nil {
|
||||
log.Errorf("UncompressFile() error: %v", err)
|
||||
return err
|
||||
}
|
||||
defer uncompressedFile.Close()
|
||||
|
||||
_, err = io.Copy(uncompressedFile, gzipReader)
|
||||
if err != nil {
|
||||
log.Errorf("UncompressFile() error: %v", err)
|
||||
return err
|
||||
}
|
||||
if err := os.Remove(fileIn); err != nil {
|
||||
log.Errorf("UncompressFile() error: %v", err)
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
107
internal/util/copy.go
Normal file
107
internal/util/copy.go
Normal file
@@ -0,0 +1,107 @@
|
||||
// Copyright (C) 2022 NHR@FAU, University Erlangen-Nuremberg.
|
||||
// All rights reserved.
|
||||
// Use of this source code is governed by a MIT-style
|
||||
// license that can be found in the LICENSE file.
|
||||
package util
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"path/filepath"
|
||||
)
|
||||
|
||||
func CopyFile(src, dst string) (err error) {
|
||||
in, err := os.Open(src)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
defer in.Close()
|
||||
|
||||
out, err := os.Create(dst)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
defer func() {
|
||||
if e := out.Close(); e != nil {
|
||||
err = e
|
||||
}
|
||||
}()
|
||||
|
||||
_, err = io.Copy(out, in)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
err = out.Sync()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
si, err := os.Stat(src)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
err = os.Chmod(dst, si.Mode())
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func CopyDir(src string, dst string) (err error) {
|
||||
src = filepath.Clean(src)
|
||||
dst = filepath.Clean(dst)
|
||||
|
||||
si, err := os.Stat(src)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !si.IsDir() {
|
||||
return fmt.Errorf("source is not a directory")
|
||||
}
|
||||
|
||||
_, err = os.Stat(dst)
|
||||
if err != nil && !os.IsNotExist(err) {
|
||||
return
|
||||
}
|
||||
if err == nil {
|
||||
return fmt.Errorf("destination already exists")
|
||||
}
|
||||
|
||||
err = os.MkdirAll(dst, si.Mode())
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
entries, err := ioutil.ReadDir(src)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
for _, entry := range entries {
|
||||
srcPath := filepath.Join(src, entry.Name())
|
||||
dstPath := filepath.Join(dst, entry.Name())
|
||||
|
||||
if entry.IsDir() {
|
||||
err = CopyDir(srcPath, dstPath)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
} else {
|
||||
// Skip symlinks.
|
||||
if entry.Mode()&os.ModeSymlink != 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
err = CopyFile(srcPath, dstPath)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return
|
||||
}
|
34
internal/util/diskUsage.go
Normal file
34
internal/util/diskUsage.go
Normal file
@@ -0,0 +1,34 @@
|
||||
// Copyright (C) 2022 NHR@FAU, University Erlangen-Nuremberg.
|
||||
// All rights reserved.
|
||||
// Use of this source code is governed by a MIT-style
|
||||
// license that can be found in the LICENSE file.
|
||||
package util
|
||||
|
||||
import (
|
||||
"os"
|
||||
|
||||
"github.com/ClusterCockpit/cc-backend/pkg/log"
|
||||
)
|
||||
|
||||
func DiskUsage(dirpath string) float64 {
|
||||
var size int64
|
||||
|
||||
dir, err := os.Open(dirpath)
|
||||
if err != nil {
|
||||
log.Errorf("DiskUsage() error: %v", err)
|
||||
return 0
|
||||
}
|
||||
defer dir.Close()
|
||||
|
||||
files, err := dir.Readdir(-1)
|
||||
if err != nil {
|
||||
log.Errorf("DiskUsage() error: %v", err)
|
||||
return 0
|
||||
}
|
||||
|
||||
for _, file := range files {
|
||||
size += file.Size()
|
||||
}
|
||||
|
||||
return float64(size) * 1e-6
|
||||
}
|
34
internal/util/fstat.go
Normal file
34
internal/util/fstat.go
Normal file
@@ -0,0 +1,34 @@
|
||||
// Copyright (C) 2022 NHR@FAU, University Erlangen-Nuremberg.
|
||||
// All rights reserved.
|
||||
// Use of this source code is governed by a MIT-style
|
||||
// license that can be found in the LICENSE file.
|
||||
package util
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"os"
|
||||
|
||||
"github.com/ClusterCockpit/cc-backend/pkg/log"
|
||||
)
|
||||
|
||||
func CheckFileExists(filePath string) bool {
|
||||
_, err := os.Stat(filePath)
|
||||
return !errors.Is(err, os.ErrNotExist)
|
||||
}
|
||||
|
||||
func GetFilesize(filePath string) int64 {
|
||||
fileInfo, err := os.Stat(filePath)
|
||||
if err != nil {
|
||||
log.Errorf("Error on Stat %s: %v", filePath, err)
|
||||
}
|
||||
return fileInfo.Size()
|
||||
}
|
||||
|
||||
func GetFilecount(path string) int {
|
||||
files, err := os.ReadDir(path)
|
||||
if err != nil {
|
||||
log.Errorf("Error on ReadDir %s: %v", path, err)
|
||||
}
|
||||
|
||||
return len(files)
|
||||
}
|
21
internal/util/statistics.go
Normal file
21
internal/util/statistics.go
Normal file
@@ -0,0 +1,21 @@
|
||||
// Copyright (C) 2022 NHR@FAU, University Erlangen-Nuremberg.
|
||||
// All rights reserved.
|
||||
// Use of this source code is governed by a MIT-style
|
||||
// license that can be found in the LICENSE file.
|
||||
package util
|
||||
|
||||
import "golang.org/x/exp/constraints"
|
||||
|
||||
func Min[T constraints.Ordered](a, b T) T {
|
||||
if a < b {
|
||||
return a
|
||||
}
|
||||
return b
|
||||
}
|
||||
|
||||
func Max[T constraints.Ordered](a, b T) T {
|
||||
if a > b {
|
||||
return a
|
||||
}
|
||||
return b
|
||||
}
|
Reference in New Issue
Block a user