cc-backend/internal/api/api_test.go

442 lines
12 KiB
Go
Raw Permalink Normal View History

2024-04-11 23:04:30 +02:00
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
// All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
2023-05-04 07:00:30 +02:00
package api_test
2022-01-24 10:08:47 +01:00
import (
"bytes"
"context"
"encoding/json"
2022-09-06 09:31:52 +02:00
"fmt"
2022-01-24 10:08:47 +01:00
"net/http"
"net/http/httptest"
"os"
"path/filepath"
"reflect"
"strconv"
"strings"
2022-01-24 10:08:47 +01:00
"testing"
2022-06-22 18:06:02 +02:00
"github.com/ClusterCockpit/cc-backend/internal/api"
"github.com/ClusterCockpit/cc-backend/internal/archiver"
"github.com/ClusterCockpit/cc-backend/internal/auth"
2022-06-22 18:06:02 +02:00
"github.com/ClusterCockpit/cc-backend/internal/config"
"github.com/ClusterCockpit/cc-backend/internal/graph"
"github.com/ClusterCockpit/cc-backend/internal/metricDataDispatcher"
2022-06-22 18:06:02 +02:00
"github.com/ClusterCockpit/cc-backend/internal/metricdata"
"github.com/ClusterCockpit/cc-backend/internal/repository"
2022-09-06 09:31:52 +02:00
"github.com/ClusterCockpit/cc-backend/pkg/archive"
2023-02-15 13:07:19 +01:00
"github.com/ClusterCockpit/cc-backend/pkg/log"
2022-06-22 18:06:02 +02:00
"github.com/ClusterCockpit/cc-backend/pkg/schema"
2022-01-24 10:08:47 +01:00
"github.com/gorilla/mux"
2022-03-15 08:29:29 +01:00
_ "github.com/mattn/go-sqlite3"
2022-01-24 10:08:47 +01:00
)
func setup(t *testing.T) *api.RestApi {
2022-09-06 09:31:52 +02:00
const testconfig = `{
2022-09-06 14:40:14 +02:00
"addr": "0.0.0.0:8080",
2022-09-13 15:32:01 +02:00
"validate": false,
2022-09-06 09:31:52 +02:00
"archive": {
"kind": "file",
"path": "./var/job-archive"
2022-09-06 14:40:14 +02:00
},
2023-08-18 09:31:57 +02:00
"jwts": {
"max-age": "2m"
},
2022-09-06 14:40:14 +02:00
"clusters": [
{
"name": "testcluster",
2022-09-20 12:21:20 +02:00
"metricDataRepository": {"kind": "test", "url": "bla:8081"},
"filterRanges": {
"numNodes": { "from": 1, "to": 64 },
"duration": { "from": 0, "to": 86400 },
"startTime": { "from": "2022-01-01T00:00:00Z", "to": null }
}
2023-05-04 07:00:30 +02:00
}
2022-09-06 14:40:14 +02:00
]
2022-09-06 09:31:52 +02:00
}`
2022-01-24 10:08:47 +01:00
const testclusterJson = `{
2022-09-20 12:21:20 +02:00
"name": "testcluster",
"subClusters": [
{
"name": "sc1",
"nodes": "host123,host124,host125",
2022-01-24 10:08:47 +01:00
"processorType": "Intel Core i7-4770",
"socketsPerNode": 1,
"coresPerSocket": 4,
"threadsPerCore": 2,
2023-03-22 07:05:41 +01:00
"flopRateScalar": {
"unit": {
"prefix": "G",
"base": "F/s"
},
"value": 14
},
"flopRateSimd": {
"unit": {
"prefix": "G",
"base": "F/s"
},
"value": 112
},
"memoryBandwidth": {
"unit": {
"prefix": "G",
"base": "B/s"
},
"value": 24
},
"numberOfNodes": 70,
2022-01-24 10:08:47 +01:00
"topology": {
"node": [0, 1, 2, 3, 4, 5, 6, 7],
"socket": [[0, 1, 2, 3, 4, 5, 6, 7]],
"memoryDomain": [[0, 1, 2, 3, 4, 5, 6, 7]],
"die": [[0, 1, 2, 3, 4, 5, 6, 7]],
2022-09-20 12:21:20 +02:00
"core": [[0], [1], [2], [3], [4], [5], [6], [7]]
2022-01-24 10:08:47 +01:00
}
}
],
"metricConfig": [
{
"name": "load_one",
2023-04-07 08:04:40 +02:00
"unit": { "base": ""},
2022-01-24 10:08:47 +01:00
"scope": "node",
"timestep": 60,
2023-03-22 07:05:41 +01:00
"aggregation": "avg",
2022-01-24 10:08:47 +01:00
"peak": 8,
"normal": 0,
"caution": 0,
"alert": 0
}
2022-09-20 12:21:20 +02:00
]
2022-01-24 10:08:47 +01:00
}`
2023-02-15 13:07:19 +01:00
log.Init("info", true)
2022-01-24 10:08:47 +01:00
tmpdir := t.TempDir()
jobarchive := filepath.Join(tmpdir, "job-archive")
if err := os.Mkdir(jobarchive, 0777); err != nil {
t.Fatal(err)
}
2023-03-27 14:41:00 +02:00
if err := os.WriteFile(filepath.Join(jobarchive, "version.txt"), []byte(fmt.Sprintf("%d", 1)), 0666); err != nil {
t.Fatal(err)
}
2022-01-24 10:08:47 +01:00
if err := os.Mkdir(filepath.Join(jobarchive, "testcluster"), 0777); err != nil {
t.Fatal(err)
}
if err := os.WriteFile(filepath.Join(jobarchive, "testcluster", "cluster.json"), []byte(testclusterJson), 0666); err != nil {
t.Fatal(err)
}
dbfilepath := filepath.Join(tmpdir, "test.db")
err := repository.MigrateDB("sqlite3", dbfilepath)
if err != nil {
t.Fatal(err)
}
2022-01-24 10:08:47 +01:00
2022-09-06 09:31:52 +02:00
cfgFilePath := filepath.Join(tmpdir, "config.json")
if err := os.WriteFile(cfgFilePath, []byte(testconfig), 0666); err != nil {
t.Fatal(err)
}
config.Init(cfgFilePath)
2022-09-20 12:21:20 +02:00
archiveCfg := fmt.Sprintf("{\"kind\": \"file\",\"path\": \"%s\"}", jobarchive)
2022-09-06 09:31:52 +02:00
2022-06-22 18:06:02 +02:00
repository.Connect("sqlite3", dbfilepath)
2022-01-24 10:08:47 +01:00
2022-11-08 16:49:45 +01:00
if err := archive.Init(json.RawMessage(archiveCfg), config.Keys.DisableArchive); err != nil {
2022-01-24 10:08:47 +01:00
t.Fatal(err)
}
if err := metricdata.Init(); err != nil {
2022-01-24 10:08:47 +01:00
t.Fatal(err)
}
archiver.Start(repository.GetJobRepository())
auth.Init()
graph.Init()
2022-02-19 10:28:29 +01:00
return api.New()
2022-01-24 10:08:47 +01:00
}
func cleanup() {
// TODO: Clear all caches, reset all modules, etc...
}
/*
2022-09-06 09:31:52 +02:00
* This function starts a job, stops it, and then reads its data from the job-archive.
* Do not run sub-tests in parallel! Tests should not be run in parallel at all, because
2023-05-04 07:00:30 +02:00
* at least `setup` modifies global state.
2022-01-24 10:08:47 +01:00
*/
func TestRestApi(t *testing.T) {
restapi := setup(t)
t.Cleanup(cleanup)
testData := schema.JobData{
"load_one": map[schema.MetricScope]*schema.JobMetric{
schema.MetricScopeNode: {
2022-11-09 19:47:56 +01:00
Unit: schema.Unit{Base: "load"},
2022-01-24 10:08:47 +01:00
Timestep: 60,
Series: []schema.Series{
{
Hostname: "host123",
Statistics: schema.MetricStatistics{Min: 0.1, Avg: 0.2, Max: 0.3},
2022-01-24 10:08:47 +01:00
Data: []schema.Float{0.1, 0.1, 0.1, 0.2, 0.2, 0.2, 0.3, 0.3, 0.3},
},
},
},
},
}
metricdata.TestLoadDataCallback = func(job *schema.Job, metrics []string, scopes []schema.MetricScope, ctx context.Context, resolution int) (schema.JobData, error) {
2022-01-24 10:08:47 +01:00
return testData, nil
}
r := mux.NewRouter()
2024-07-05 13:16:21 +02:00
r.PathPrefix("/api").Subrouter()
r.StrictSlash(true)
restapi.MountApiRoutes(r)
2022-01-24 10:08:47 +01:00
const startJobBody string = `{
2023-03-27 14:41:00 +02:00
"jobId": 123,
2022-09-06 09:31:52 +02:00
"user": "testuser",
"project": "testproj",
"cluster": "testcluster",
"partition": "default",
2022-03-30 09:39:13 +02:00
"walltime": 3600,
2022-09-06 09:31:52 +02:00
"arrayJobId": 0,
"numNodes": 1,
"numHwthreads": 8,
"numAcc": 0,
"exclusive": 1,
"monitoringStatus": 1,
"smt": 1,
"tags": [{ "type": "testTagType", "name": "testTagName", "scope": "testuser" }],
2022-09-06 09:31:52 +02:00
"resources": [
{
"hostname": "host123",
"hwthreads": [0, 1, 2, 3, 4, 5, 6, 7]
}
],
"metaData": { "jobScript": "blablabla..." },
"startTime": 123456789
2022-01-24 10:08:47 +01:00
}`
var dbid int64
2024-07-05 15:25:24 +02:00
const contextUserKey repository.ContextKey = "user"
contextUserValue := &schema.User{
Username: "testuser",
Projects: make([]string, 0),
Roles: []string{"user"},
AuthType: 0,
AuthSource: 2,
}
2022-01-24 10:08:47 +01:00
if ok := t.Run("StartJob", func(t *testing.T) {
2024-07-05 15:25:24 +02:00
req := httptest.NewRequest(http.MethodPost, "/jobs/start_job/", bytes.NewBuffer([]byte(startJobBody)))
2022-01-24 10:08:47 +01:00
recorder := httptest.NewRecorder()
2024-07-05 15:25:24 +02:00
ctx := context.WithValue(req.Context(), contextUserKey, contextUserValue)
r.ServeHTTP(recorder, req.WithContext(ctx))
2022-01-24 10:08:47 +01:00
response := recorder.Result()
if response.StatusCode != http.StatusCreated {
t.Fatal(response.Status, recorder.Body.String())
}
2022-02-07 14:20:44 +01:00
var res api.StartJobApiResponse
2022-01-24 10:08:47 +01:00
if err := json.Unmarshal(recorder.Body.Bytes(), &res); err != nil {
t.Fatal(err)
}
resolver := graph.GetResolverInstance()
job, err := resolver.Query().Job(ctx, strconv.Itoa(int(res.DBID)))
2022-01-24 10:08:47 +01:00
if err != nil {
t.Fatal(err)
}
job.Tags, err = resolver.Job().Tags(ctx, job)
2022-02-08 12:49:28 +01:00
if err != nil {
t.Fatal(err)
}
2022-01-24 10:08:47 +01:00
if job.JobID != 123 ||
job.User != "testuser" ||
job.Project != "testproj" ||
job.Cluster != "testcluster" ||
job.SubCluster != "sc1" ||
2023-05-04 07:00:30 +02:00
job.Partition != "default" ||
job.Walltime != 3600 ||
job.ArrayJobId != 0 ||
2022-01-24 10:08:47 +01:00
job.NumNodes != 1 ||
2023-05-04 07:00:30 +02:00
job.NumHWThreads != 8 ||
job.NumAcc != 0 ||
2022-01-24 10:08:47 +01:00
job.Exclusive != 1 ||
job.MonitoringStatus != 1 ||
2023-05-04 07:00:30 +02:00
job.SMT != 1 ||
!reflect.DeepEqual(job.Resources, []*schema.Resource{{Hostname: "host123", HWThreads: []int{0, 1, 2, 3, 4, 5, 6, 7}}}) ||
2022-01-24 10:08:47 +01:00
job.StartTime.Unix() != 123456789 {
t.Fatalf("unexpected job properties: %#v", job)
}
if len(job.Tags) != 1 || job.Tags[0].Type != "testTagType" || job.Tags[0].Name != "testTagName" || job.Tags[0].Scope != "testuser" {
2022-02-08 12:49:28 +01:00
t.Fatalf("unexpected tags: %#v", job.Tags)
}
2022-01-24 10:08:47 +01:00
dbid = res.DBID
}); !ok {
return
}
const stopJobBody string = `{
2023-03-27 11:11:14 +02:00
"jobId": 123,
2022-01-24 10:08:47 +01:00
"startTime": 123456789,
"cluster": "testcluster",
"jobState": "completed",
"stopTime": 123457789
}`
var stoppedJob *schema.Job
if ok := t.Run("StopJob", func(t *testing.T) {
2024-07-05 15:42:08 +02:00
req := httptest.NewRequest(http.MethodPost, "/jobs/stop_job/", bytes.NewBuffer([]byte(stopJobBody)))
2022-01-24 10:08:47 +01:00
recorder := httptest.NewRecorder()
2024-07-05 15:42:08 +02:00
ctx := context.WithValue(req.Context(), contextUserKey, contextUserValue)
r.ServeHTTP(recorder, req.WithContext(ctx))
2022-01-24 10:08:47 +01:00
response := recorder.Result()
if response.StatusCode != http.StatusOK {
t.Fatal(response.Status, recorder.Body.String())
}
archiver.WaitForArchiving()
resolver := graph.GetResolverInstance()
job, err := resolver.Query().Job(ctx, strconv.Itoa(int(dbid)))
2022-01-24 10:08:47 +01:00
if err != nil {
t.Fatal(err)
}
if job.State != schema.JobStateCompleted {
t.Fatal("expected job to be completed")
}
if job.Duration != (123457789 - 123456789) {
t.Fatalf("unexpected job properties: %#v", job)
}
job.MetaData, err = restapi.JobRepository.FetchMetadata(job)
if err != nil {
t.Fatal(err)
}
if !reflect.DeepEqual(job.MetaData, map[string]string{"jobScript": "blablabla..."}) {
t.Fatalf("unexpected job.metaData: %#v", job.MetaData)
}
2022-01-24 10:08:47 +01:00
stoppedJob = job
}); !ok {
return
}
t.Run("CheckArchive", func(t *testing.T) {
data, err := metricDataDispatcher.LoadData(stoppedJob, []string{"load_one"}, []schema.MetricScope{schema.MetricScopeNode}, context.Background(), 60)
2022-01-24 10:08:47 +01:00
if err != nil {
t.Fatal(err)
}
if !reflect.DeepEqual(data, testData) {
t.Fatal("unexpected data fetched from archive")
}
})
t.Run("CheckDoubleStart", func(t *testing.T) {
// Starting a job with the same jobId and cluster should only be allowed if the startTime is far appart!
body := strings.Replace(startJobBody, `"startTime": 123456789`, `"startTime": 123456790`, -1)
2024-07-05 15:42:08 +02:00
req := httptest.NewRequest(http.MethodPost, "/jobs/start_job/", bytes.NewBuffer([]byte(body)))
recorder := httptest.NewRecorder()
2024-07-05 15:42:08 +02:00
ctx := context.WithValue(req.Context(), contextUserKey, contextUserValue)
r.ServeHTTP(recorder, req.WithContext(ctx))
response := recorder.Result()
if response.StatusCode != http.StatusUnprocessableEntity {
t.Fatal(response.Status, recorder.Body.String())
}
})
2022-07-12 10:22:22 +02:00
2023-05-04 07:00:30 +02:00
const startJobBodyFailed string = `{
"jobId": 12345,
"user": "testuser",
"project": "testproj",
"cluster": "testcluster",
"partition": "default",
"walltime": 3600,
"numNodes": 1,
"exclusive": 1,
"monitoringStatus": 1,
"smt": 1,
"resources": [
{
"hostname": "host123"
}
],
"startTime": 12345678
}`
2023-05-04 07:00:30 +02:00
ok := t.Run("StartJobFailed", func(t *testing.T) {
2024-07-05 15:42:08 +02:00
req := httptest.NewRequest(http.MethodPost, "/jobs/start_job/", bytes.NewBuffer([]byte(startJobBodyFailed)))
2022-07-12 10:22:22 +02:00
recorder := httptest.NewRecorder()
2024-07-05 15:42:08 +02:00
ctx := context.WithValue(req.Context(), contextUserKey, contextUserValue)
r.ServeHTTP(recorder, req.WithContext(ctx))
2022-07-12 10:22:22 +02:00
response := recorder.Result()
if response.StatusCode != http.StatusCreated {
t.Fatal(response.Status, recorder.Body.String())
}
})
if !ok {
t.Fatal("subtest failed")
}
2023-05-04 07:00:30 +02:00
const stopJobBodyFailed string = `{
2023-03-27 11:11:14 +02:00
"jobId": 12345,
2022-07-12 10:22:22 +02:00
"cluster": "testcluster",
"jobState": "failed",
"stopTime": 12355678
}`
2023-05-04 07:00:30 +02:00
ok = t.Run("StopJobFailed", func(t *testing.T) {
2024-07-05 15:42:08 +02:00
req := httptest.NewRequest(http.MethodPost, "/jobs/stop_job/", bytes.NewBuffer([]byte(stopJobBodyFailed)))
2022-07-12 10:22:22 +02:00
recorder := httptest.NewRecorder()
2024-07-05 15:42:08 +02:00
ctx := context.WithValue(req.Context(), contextUserKey, contextUserValue)
r.ServeHTTP(recorder, req.WithContext(ctx))
2022-07-12 10:22:22 +02:00
response := recorder.Result()
if response.StatusCode != http.StatusOK {
t.Fatal(response.Status, recorder.Body.String())
}
archiver.WaitForArchiving()
2022-07-12 10:22:22 +02:00
jobid, cluster := int64(12345), "testcluster"
job, err := restapi.JobRepository.Find(&jobid, &cluster, nil)
if err != nil {
t.Fatal(err)
}
2022-11-25 11:48:30 +01:00
if job.State != schema.JobStateFailed {
t.Fatal("expected job to be failed")
2022-07-12 10:22:22 +02:00
}
})
if !ok {
t.Fatal("subtest failed")
}
2022-01-24 10:08:47 +01:00
}