new test: start/stop job via REST API

This commit is contained in:
Lou Knauer 2022-01-24 10:08:47 +01:00
parent ae3e03f9b9
commit 7359a556d9
4 changed files with 297 additions and 10 deletions

View File

@ -81,13 +81,12 @@ This project uses [gqlgen](https://github.com/99designs/gqlgen) for the GraphQL
### TODO
- [ ] fix frontend
- [ ] write (unit) tests
- [ ] make tokens and sessions (currently based on cookies) expire after some configurable time
- [ ] when authenticating using a JWT, check if that user still exists
- [ ] allow mysql as database and passing the database uri as environment variable
- [ ] allow passing the database uri as environment variable
- [ ] fix InfluxDB MetricDataRepository (new or old line-protocol format? Support node-level metrics only?)
- [ ] support all metric scopes
- [ ] documentation, comments in the code base
- [ ] write more TODOs
- [ ] caching
- [ ] use more prepared statements and [sqrl](https://github.com/elgris/sqrl) instead of *squirrel*
- [ ] replace `github.com/ClusterCockpit/cc-jobarchive` with `github.com/ClusterCockpit/cc-backend` in all import paths

View File

@ -183,8 +183,8 @@ func (api *RestApi) startJob(rw http.ResponseWriter, r *http.Request) {
return
}
if config.GetClusterConfig(req.Cluster) == nil {
http.Error(rw, fmt.Sprintf("cluster '%s' does not exist", req.Cluster), http.StatusBadRequest)
if config.GetClusterConfig(req.Cluster) == nil || config.GetPartition(req.Cluster, req.Partition) == nil {
http.Error(rw, fmt.Sprintf("cluster %#v or partition %#v does not exist", req.Cluster, req.Partition), http.StatusBadRequest)
return
}
@ -209,6 +209,10 @@ func (api *RestApi) startJob(rw http.ResponseWriter, r *http.Request) {
return
}
if req.State == "" {
req.State = schema.JobStateRunning
}
req.RawResources, err = json.Marshal(req.Resources)
if err != nil {
log.Fatal(err)

283
api_test.go Normal file
View File

@ -0,0 +1,283 @@
package main
import (
"bytes"
"context"
"encoding/json"
"fmt"
"log"
"net/http"
"net/http/httptest"
"os"
"path/filepath"
"reflect"
"strconv"
"testing"
"github.com/ClusterCockpit/cc-jobarchive/api"
"github.com/ClusterCockpit/cc-jobarchive/config"
"github.com/ClusterCockpit/cc-jobarchive/graph"
"github.com/ClusterCockpit/cc-jobarchive/metricdata"
"github.com/ClusterCockpit/cc-jobarchive/schema"
"github.com/gorilla/mux"
"github.com/jmoiron/sqlx"
)
func setup(t *testing.T) *api.RestApi {
if db != nil {
panic("prefer using sub-tests (`t.Run`) or implement `cleanup` before calling setup twice.")
}
devNull, err := os.Open(os.DevNull)
if err != nil {
t.Fatal(err)
}
// Makes output cleaner
log.SetOutput(devNull)
const testclusterJson = `{
"name": "testcluster",
"partitions": [
{
"name": "default",
"processorType": "Intel Core i7-4770",
"socketsPerNode": 1,
"coresPerSocket": 4,
"threadsPerCore": 2,
"flopRateScalar": 44,
"flopRateSimd": 704,
"memoryBandwidth": 80,
"topology": {
"node": [0, 1, 2, 3, 4, 5, 6, 7],
"socket": [[0, 1, 2, 3, 4, 5, 6, 7]],
"memoryDomain": [[0, 1, 2, 3, 4, 5, 6, 7]],
"die": [[0, 1, 2, 3, 4, 5, 6, 7]],
"core": [[0], [1], [2], [3], [4], [5], [6], [7]],
"accelerators": []
}
}
],
"metricDataRepository": {"kind": "test"},
"metricConfig": [
{
"name": "load_one",
"unit": "load",
"scope": "node",
"timestep": 60,
"peak": 8,
"normal": 0,
"caution": 0,
"alert": 0
}
],
"filterRanges": {
"numNodes": { "from": 1, "to": 1 },
"duration": { "from": 0, "to": 172800 },
"startTime": { "from": "2010-01-01T00:00:00Z", "to": null }
}
}`
tmpdir := t.TempDir()
jobarchive := filepath.Join(tmpdir, "job-archive")
if err := os.Mkdir(jobarchive, 0777); err != nil {
t.Fatal(err)
}
if err := os.Mkdir(filepath.Join(jobarchive, "testcluster"), 0777); err != nil {
t.Fatal(err)
}
if err := os.WriteFile(filepath.Join(jobarchive, "testcluster", "cluster.json"), []byte(testclusterJson), 0666); err != nil {
t.Fatal(err)
}
dbfilepath := filepath.Join(tmpdir, "test.db")
f, err := os.Create(dbfilepath)
if err != nil {
t.Fatal(err)
}
f.Close()
db, err = sqlx.Open("sqlite3", fmt.Sprintf("%s?_foreign_keys=on", dbfilepath))
if err != nil {
t.Fatal(err)
}
db.SetMaxOpenConns(1)
if _, err := db.Exec(JOBS_DB_SCHEMA); err != nil {
t.Fatal(err)
}
if err := config.Init(db, false, programConfig.UiDefaults, jobarchive); err != nil {
t.Fatal(err)
}
if err := metricdata.Init(jobarchive, false); err != nil {
t.Fatal(err)
}
resolver := &graph.Resolver{DB: db}
resolver.Init()
return &api.RestApi{
DB: db,
Resolver: resolver,
}
}
func cleanup() {
log.SetOutput(os.Stderr)
// TODO: Clear all caches, reset all modules, etc...
}
/*
* This function starts a job, stops it, and then reads its data from the job-archive.
* Do not run sub-tests in parallel! Tests should not be run in parallel at all, because
* at least `setup` modifies global state. Log-Output is redirected to /dev/null on purpose.
*/
func TestRestApi(t *testing.T) {
restapi := setup(t)
t.Cleanup(cleanup)
testData := schema.JobData{
"load_one": map[schema.MetricScope]*schema.JobMetric{
schema.MetricScopeNode: {
Unit: "load",
Scope: schema.MetricScopeNode,
Timestep: 60,
Series: []schema.Series{
{
Hostname: "testhost",
Statistics: &schema.MetricStatistics{Min: 0.1, Avg: 0.2, Max: 0.3},
Data: []schema.Float{0.1, 0.1, 0.1, 0.2, 0.2, 0.2, 0.3, 0.3, 0.3},
},
},
},
},
}
metricdata.TestLoadDataCallback = func(job *schema.Job, metrics []string, scopes []schema.MetricScope, ctx context.Context) (schema.JobData, error) {
return testData, nil
}
r := mux.NewRouter()
restapi.MountRoutes(r)
const startJobBody string = `{
"jobId": 123,
"user": "testuser",
"project": "testproj",
"cluster": "testcluster",
"partition": "default",
"arrayJobId": 0,
"numNodes": 1,
"numHwthreads": 8,
"numAcc": 0,
"exclusive": 1,
"monitoringStatus": 1,
"smt": 1,
"tags": [],
"resources": [
{
"hostname": "testhost",
"hwthreads": [0, 1, 2, 3, 4, 5, 6, 7]
}
],
"metaData": null,
"startTime": 123456789
}`
var dbid int64
if ok := t.Run("StartJob", func(t *testing.T) {
req := httptest.NewRequest(http.MethodPost, "/api/jobs/start_job/", bytes.NewBuffer([]byte(startJobBody)))
recorder := httptest.NewRecorder()
r.ServeHTTP(recorder, req)
response := recorder.Result()
if response.StatusCode != http.StatusCreated {
t.Fatal(response.Status, recorder.Body.String())
}
var res api.StartJobApiRespone
if err := json.Unmarshal(recorder.Body.Bytes(), &res); err != nil {
t.Fatal(err)
}
job, err := restapi.Resolver.Query().Job(context.Background(), strconv.Itoa(int(res.DBID)))
if err != nil {
t.Fatal(err)
}
if job.JobID != 123 ||
job.User != "testuser" ||
job.Project != "testproj" ||
job.Cluster != "testcluster" ||
job.Partition != "default" ||
job.ArrayJobId != 0 ||
job.NumNodes != 1 ||
job.NumHWThreads != 8 ||
job.NumAcc != 0 ||
job.Exclusive != 1 ||
job.MonitoringStatus != 1 ||
job.SMT != 1 ||
len(job.Tags) != 0 ||
!reflect.DeepEqual(job.Resources, []*schema.Resource{{Hostname: "testhost", HWThreads: []int{0, 1, 2, 3, 4, 5, 6, 7}}}) ||
job.StartTime.Unix() != 123456789 {
t.Fatalf("unexpected job properties: %#v", job)
}
dbid = res.DBID
}); !ok {
return
}
const stopJobBody string = `{
"jobId": 123,
"startTime": 123456789,
"cluster": "testcluster",
"jobState": "completed",
"stopTime": 123457789
}`
var stoppedJob *schema.Job
if ok := t.Run("StopJob", func(t *testing.T) {
req := httptest.NewRequest(http.MethodPost, "/api/jobs/stop_job/", bytes.NewBuffer([]byte(stopJobBody)))
recorder := httptest.NewRecorder()
r.ServeHTTP(recorder, req)
response := recorder.Result()
if response.StatusCode != http.StatusOK {
t.Fatal(response.Status, recorder.Body.String())
}
restapi.OngoingArchivings.Wait()
job, err := restapi.Resolver.Query().Job(context.Background(), strconv.Itoa(int(dbid)))
if err != nil {
t.Fatal(err)
}
if job.State != schema.JobStateCompleted {
t.Fatal("expected job to be completed")
}
if job.Duration != (123457789 - 123456789) {
t.Fatalf("unexpected job properties: %#v", job)
}
stoppedJob = job
}); !ok {
return
}
t.Run("CheckArchive", func(t *testing.T) {
data, err := metricdata.LoadData(stoppedJob, []string{"load_one"}, []schema.MetricScope{schema.MetricScopeNode}, context.Background())
if err != nil {
t.Fatal(err)
}
if !reflect.DeepEqual(data, testData) {
t.Fatal("unexpected data fetched from archive")
}
})
}

View File

@ -14,6 +14,7 @@ import (
type Float float64
var NaN Float = Float(math.NaN())
var nullAsBytes []byte = []byte("null")
func (f Float) IsNaN() bool {
return math.IsNaN(float64(f))
@ -22,10 +23,10 @@ func (f Float) IsNaN() bool {
// NaN will be serialized to `null`.
func (f Float) MarshalJSON() ([]byte, error) {
if f.IsNaN() {
return []byte("null"), nil
return nullAsBytes, nil
}
return []byte(strconv.FormatFloat(float64(f), 'f', 2, 64)), nil
return strconv.AppendFloat(make([]byte, 0, 10), float64(f), 'f', 2, 64), nil
}
// `null` will be unserialized to NaN.
@ -59,8 +60,8 @@ func (f *Float) UnmarshalGQL(v interface{}) error {
// NaN will be serialized to `null`.
func (f Float) MarshalGQL(w io.Writer) {
if f.IsNaN() {
w.Write([]byte(`null`))
w.Write(nullAsBytes)
} else {
w.Write([]byte(strconv.FormatFloat(float64(f), 'f', 2, 64)))
w.Write(strconv.AppendFloat(make([]byte, 0, 10), float64(f), 'f', 2, 64))
}
}