From 7359a556d99fb110b09ea01cd9db8a0720074cb3 Mon Sep 17 00:00:00 2001 From: Lou Knauer Date: Mon, 24 Jan 2022 10:08:47 +0100 Subject: [PATCH] new test: start/stop job via REST API --- README.md | 7 +- api/rest.go | 8 +- api_test.go | 283 ++++++++++++++++++++++++++++++++++++++++++++++++ schema/float.go | 9 +- 4 files changed, 297 insertions(+), 10 deletions(-) create mode 100644 api_test.go diff --git a/README.md b/README.md index e8e7a42..6d8290a 100644 --- a/README.md +++ b/README.md @@ -81,13 +81,12 @@ This project uses [gqlgen](https://github.com/99designs/gqlgen) for the GraphQL ### TODO -- [ ] fix frontend - [ ] write (unit) tests - [ ] make tokens and sessions (currently based on cookies) expire after some configurable time - [ ] when authenticating using a JWT, check if that user still exists -- [ ] allow mysql as database and passing the database uri as environment variable +- [ ] allow passing the database uri as environment variable - [ ] fix InfluxDB MetricDataRepository (new or old line-protocol format? Support node-level metrics only?) -- [ ] support all metric scopes - [ ] documentation, comments in the code base - [ ] write more TODOs -- [ ] caching +- [ ] use more prepared statements and [sqrl](https://github.com/elgris/sqrl) instead of *squirrel* +- [ ] replace `github.com/ClusterCockpit/cc-jobarchive` with `github.com/ClusterCockpit/cc-backend` in all import paths diff --git a/api/rest.go b/api/rest.go index 65388eb..15ca980 100644 --- a/api/rest.go +++ b/api/rest.go @@ -183,8 +183,8 @@ func (api *RestApi) startJob(rw http.ResponseWriter, r *http.Request) { return } - if config.GetClusterConfig(req.Cluster) == nil { - http.Error(rw, fmt.Sprintf("cluster '%s' does not exist", req.Cluster), http.StatusBadRequest) + if config.GetClusterConfig(req.Cluster) == nil || config.GetPartition(req.Cluster, req.Partition) == nil { + http.Error(rw, fmt.Sprintf("cluster %#v or partition %#v does not exist", req.Cluster, req.Partition), http.StatusBadRequest) return } @@ -209,6 +209,10 @@ func (api *RestApi) startJob(rw http.ResponseWriter, r *http.Request) { return } + if req.State == "" { + req.State = schema.JobStateRunning + } + req.RawResources, err = json.Marshal(req.Resources) if err != nil { log.Fatal(err) diff --git a/api_test.go b/api_test.go new file mode 100644 index 0000000..a94b737 --- /dev/null +++ b/api_test.go @@ -0,0 +1,283 @@ +package main + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "log" + "net/http" + "net/http/httptest" + "os" + "path/filepath" + "reflect" + "strconv" + "testing" + + "github.com/ClusterCockpit/cc-jobarchive/api" + "github.com/ClusterCockpit/cc-jobarchive/config" + "github.com/ClusterCockpit/cc-jobarchive/graph" + "github.com/ClusterCockpit/cc-jobarchive/metricdata" + "github.com/ClusterCockpit/cc-jobarchive/schema" + "github.com/gorilla/mux" + "github.com/jmoiron/sqlx" +) + +func setup(t *testing.T) *api.RestApi { + if db != nil { + panic("prefer using sub-tests (`t.Run`) or implement `cleanup` before calling setup twice.") + } + + devNull, err := os.Open(os.DevNull) + if err != nil { + t.Fatal(err) + } + + // Makes output cleaner + log.SetOutput(devNull) + + const testclusterJson = `{ + "name": "testcluster", + "partitions": [ + { + "name": "default", + "processorType": "Intel Core i7-4770", + "socketsPerNode": 1, + "coresPerSocket": 4, + "threadsPerCore": 2, + "flopRateScalar": 44, + "flopRateSimd": 704, + "memoryBandwidth": 80, + "topology": { + "node": [0, 1, 2, 3, 4, 5, 6, 7], + "socket": [[0, 1, 2, 3, 4, 5, 6, 7]], + "memoryDomain": [[0, 1, 2, 3, 4, 5, 6, 7]], + "die": [[0, 1, 2, 3, 4, 5, 6, 7]], + "core": [[0], [1], [2], [3], [4], [5], [6], [7]], + "accelerators": [] + } + } + ], + "metricDataRepository": {"kind": "test"}, + "metricConfig": [ + { + "name": "load_one", + "unit": "load", + "scope": "node", + "timestep": 60, + "peak": 8, + "normal": 0, + "caution": 0, + "alert": 0 + } + ], + "filterRanges": { + "numNodes": { "from": 1, "to": 1 }, + "duration": { "from": 0, "to": 172800 }, + "startTime": { "from": "2010-01-01T00:00:00Z", "to": null } + } + }` + + tmpdir := t.TempDir() + jobarchive := filepath.Join(tmpdir, "job-archive") + if err := os.Mkdir(jobarchive, 0777); err != nil { + t.Fatal(err) + } + + if err := os.Mkdir(filepath.Join(jobarchive, "testcluster"), 0777); err != nil { + t.Fatal(err) + } + + if err := os.WriteFile(filepath.Join(jobarchive, "testcluster", "cluster.json"), []byte(testclusterJson), 0666); err != nil { + t.Fatal(err) + } + + dbfilepath := filepath.Join(tmpdir, "test.db") + f, err := os.Create(dbfilepath) + if err != nil { + t.Fatal(err) + } + f.Close() + + db, err = sqlx.Open("sqlite3", fmt.Sprintf("%s?_foreign_keys=on", dbfilepath)) + if err != nil { + t.Fatal(err) + } + + db.SetMaxOpenConns(1) + if _, err := db.Exec(JOBS_DB_SCHEMA); err != nil { + t.Fatal(err) + } + + if err := config.Init(db, false, programConfig.UiDefaults, jobarchive); err != nil { + t.Fatal(err) + } + + if err := metricdata.Init(jobarchive, false); err != nil { + t.Fatal(err) + } + + resolver := &graph.Resolver{DB: db} + resolver.Init() + return &api.RestApi{ + DB: db, + Resolver: resolver, + } +} + +func cleanup() { + log.SetOutput(os.Stderr) + // TODO: Clear all caches, reset all modules, etc... +} + +/* + * This function starts a job, stops it, and then reads its data from the job-archive. + * Do not run sub-tests in parallel! Tests should not be run in parallel at all, because + * at least `setup` modifies global state. Log-Output is redirected to /dev/null on purpose. + */ +func TestRestApi(t *testing.T) { + restapi := setup(t) + t.Cleanup(cleanup) + + testData := schema.JobData{ + "load_one": map[schema.MetricScope]*schema.JobMetric{ + schema.MetricScopeNode: { + Unit: "load", + Scope: schema.MetricScopeNode, + Timestep: 60, + Series: []schema.Series{ + { + Hostname: "testhost", + Statistics: &schema.MetricStatistics{Min: 0.1, Avg: 0.2, Max: 0.3}, + Data: []schema.Float{0.1, 0.1, 0.1, 0.2, 0.2, 0.2, 0.3, 0.3, 0.3}, + }, + }, + }, + }, + } + + metricdata.TestLoadDataCallback = func(job *schema.Job, metrics []string, scopes []schema.MetricScope, ctx context.Context) (schema.JobData, error) { + return testData, nil + } + + r := mux.NewRouter() + restapi.MountRoutes(r) + + const startJobBody string = `{ + "jobId": 123, + "user": "testuser", + "project": "testproj", + "cluster": "testcluster", + "partition": "default", + "arrayJobId": 0, + "numNodes": 1, + "numHwthreads": 8, + "numAcc": 0, + "exclusive": 1, + "monitoringStatus": 1, + "smt": 1, + "tags": [], + "resources": [ + { + "hostname": "testhost", + "hwthreads": [0, 1, 2, 3, 4, 5, 6, 7] + } + ], + "metaData": null, + "startTime": 123456789 + }` + + var dbid int64 + if ok := t.Run("StartJob", func(t *testing.T) { + req := httptest.NewRequest(http.MethodPost, "/api/jobs/start_job/", bytes.NewBuffer([]byte(startJobBody))) + recorder := httptest.NewRecorder() + + r.ServeHTTP(recorder, req) + response := recorder.Result() + if response.StatusCode != http.StatusCreated { + t.Fatal(response.Status, recorder.Body.String()) + } + + var res api.StartJobApiRespone + if err := json.Unmarshal(recorder.Body.Bytes(), &res); err != nil { + t.Fatal(err) + } + + job, err := restapi.Resolver.Query().Job(context.Background(), strconv.Itoa(int(res.DBID))) + if err != nil { + t.Fatal(err) + } + + if job.JobID != 123 || + job.User != "testuser" || + job.Project != "testproj" || + job.Cluster != "testcluster" || + job.Partition != "default" || + job.ArrayJobId != 0 || + job.NumNodes != 1 || + job.NumHWThreads != 8 || + job.NumAcc != 0 || + job.Exclusive != 1 || + job.MonitoringStatus != 1 || + job.SMT != 1 || + len(job.Tags) != 0 || + !reflect.DeepEqual(job.Resources, []*schema.Resource{{Hostname: "testhost", HWThreads: []int{0, 1, 2, 3, 4, 5, 6, 7}}}) || + job.StartTime.Unix() != 123456789 { + t.Fatalf("unexpected job properties: %#v", job) + } + + dbid = res.DBID + }); !ok { + return + } + + const stopJobBody string = `{ + "jobId": 123, + "startTime": 123456789, + "cluster": "testcluster", + + "jobState": "completed", + "stopTime": 123457789 + }` + + var stoppedJob *schema.Job + if ok := t.Run("StopJob", func(t *testing.T) { + req := httptest.NewRequest(http.MethodPost, "/api/jobs/stop_job/", bytes.NewBuffer([]byte(stopJobBody))) + recorder := httptest.NewRecorder() + + r.ServeHTTP(recorder, req) + response := recorder.Result() + if response.StatusCode != http.StatusOK { + t.Fatal(response.Status, recorder.Body.String()) + } + + restapi.OngoingArchivings.Wait() + job, err := restapi.Resolver.Query().Job(context.Background(), strconv.Itoa(int(dbid))) + if err != nil { + t.Fatal(err) + } + + if job.State != schema.JobStateCompleted { + t.Fatal("expected job to be completed") + } + + if job.Duration != (123457789 - 123456789) { + t.Fatalf("unexpected job properties: %#v", job) + } + + stoppedJob = job + }); !ok { + return + } + + t.Run("CheckArchive", func(t *testing.T) { + data, err := metricdata.LoadData(stoppedJob, []string{"load_one"}, []schema.MetricScope{schema.MetricScopeNode}, context.Background()) + if err != nil { + t.Fatal(err) + } + + if !reflect.DeepEqual(data, testData) { + t.Fatal("unexpected data fetched from archive") + } + }) +} diff --git a/schema/float.go b/schema/float.go index 21d772c..5aeb82e 100644 --- a/schema/float.go +++ b/schema/float.go @@ -14,6 +14,7 @@ import ( type Float float64 var NaN Float = Float(math.NaN()) +var nullAsBytes []byte = []byte("null") func (f Float) IsNaN() bool { return math.IsNaN(float64(f)) @@ -22,10 +23,10 @@ func (f Float) IsNaN() bool { // NaN will be serialized to `null`. func (f Float) MarshalJSON() ([]byte, error) { if f.IsNaN() { - return []byte("null"), nil + return nullAsBytes, nil } - return []byte(strconv.FormatFloat(float64(f), 'f', 2, 64)), nil + return strconv.AppendFloat(make([]byte, 0, 10), float64(f), 'f', 2, 64), nil } // `null` will be unserialized to NaN. @@ -59,8 +60,8 @@ func (f *Float) UnmarshalGQL(v interface{}) error { // NaN will be serialized to `null`. func (f Float) MarshalGQL(w io.Writer) { if f.IsNaN() { - w.Write([]byte(`null`)) + w.Write(nullAsBytes) } else { - w.Write([]byte(strconv.FormatFloat(float64(f), 'f', 2, 64))) + w.Write(strconv.AppendFloat(make([]byte, 0, 10), float64(f), 'f', 2, 64)) } }