mirror of
https://github.com/ClusterCockpit/cc-backend
synced 2025-07-26 06:06:15 +02:00
.github
api
cmd
configs
init
internal
api
api_test.go
docs.go
rest.go
archiver
auth
config
graph
importer
metricDataDispatcher
metricdata
repository
routerConfig
taskManager
util
pkg
tools
web
.gitignore
.goreleaser.yaml
LICENSE
Makefile
README.md
ReleaseNotes.md
go.mod
go.sum
gqlgen.yml
startDemo.sh
tools.go
441 lines
12 KiB
Go
441 lines
12 KiB
Go
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
|
|
// All rights reserved.
|
|
// Use of this source code is governed by a MIT-style
|
|
// license that can be found in the LICENSE file.
|
|
package api_test
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"net/http"
|
|
"net/http/httptest"
|
|
"os"
|
|
"path/filepath"
|
|
"reflect"
|
|
"strings"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/ClusterCockpit/cc-backend/internal/api"
|
|
"github.com/ClusterCockpit/cc-backend/internal/archiver"
|
|
"github.com/ClusterCockpit/cc-backend/internal/auth"
|
|
"github.com/ClusterCockpit/cc-backend/internal/config"
|
|
"github.com/ClusterCockpit/cc-backend/internal/graph"
|
|
"github.com/ClusterCockpit/cc-backend/internal/metricDataDispatcher"
|
|
"github.com/ClusterCockpit/cc-backend/internal/metricdata"
|
|
"github.com/ClusterCockpit/cc-backend/internal/repository"
|
|
"github.com/ClusterCockpit/cc-backend/pkg/archive"
|
|
"github.com/ClusterCockpit/cc-backend/pkg/log"
|
|
"github.com/ClusterCockpit/cc-backend/pkg/schema"
|
|
"github.com/gorilla/mux"
|
|
|
|
_ "github.com/mattn/go-sqlite3"
|
|
)
|
|
|
|
func setup(t *testing.T) *api.RestApi {
|
|
const testconfig = `{
|
|
"addr": "0.0.0.0:8080",
|
|
"validate": false,
|
|
"archive": {
|
|
"kind": "file",
|
|
"path": "./var/job-archive"
|
|
},
|
|
"jwts": {
|
|
"max-age": "2m"
|
|
},
|
|
"apiAllowedIPs": [
|
|
"*"
|
|
],
|
|
"clusters": [
|
|
{
|
|
"name": "testcluster",
|
|
"metricDataRepository": {"kind": "test", "url": "bla:8081"},
|
|
"filterRanges": {
|
|
"numNodes": { "from": 1, "to": 64 },
|
|
"duration": { "from": 0, "to": 86400 },
|
|
"startTime": { "from": "2022-01-01T00:00:00Z", "to": null }
|
|
}
|
|
}
|
|
]
|
|
}`
|
|
const testclusterJson = `{
|
|
"name": "testcluster",
|
|
"subClusters": [
|
|
{
|
|
"name": "sc1",
|
|
"nodes": "host123,host124,host125",
|
|
"processorType": "Intel Core i7-4770",
|
|
"socketsPerNode": 1,
|
|
"coresPerSocket": 4,
|
|
"threadsPerCore": 2,
|
|
"flopRateScalar": {
|
|
"unit": {
|
|
"prefix": "G",
|
|
"base": "F/s"
|
|
},
|
|
"value": 14
|
|
},
|
|
"flopRateSimd": {
|
|
"unit": {
|
|
"prefix": "G",
|
|
"base": "F/s"
|
|
},
|
|
"value": 112
|
|
},
|
|
"memoryBandwidth": {
|
|
"unit": {
|
|
"prefix": "G",
|
|
"base": "B/s"
|
|
},
|
|
"value": 24
|
|
},
|
|
"numberOfNodes": 70,
|
|
"topology": {
|
|
"node": [0, 1, 2, 3, 4, 5, 6, 7],
|
|
"socket": [[0, 1, 2, 3, 4, 5, 6, 7]],
|
|
"memoryDomain": [[0, 1, 2, 3, 4, 5, 6, 7]],
|
|
"die": [[0, 1, 2, 3, 4, 5, 6, 7]],
|
|
"core": [[0], [1], [2], [3], [4], [5], [6], [7]]
|
|
}
|
|
}
|
|
],
|
|
"metricConfig": [
|
|
{
|
|
"name": "load_one",
|
|
"unit": { "base": ""},
|
|
"scope": "node",
|
|
"timestep": 60,
|
|
"aggregation": "avg",
|
|
"peak": 8,
|
|
"normal": 0,
|
|
"caution": 0,
|
|
"alert": 0
|
|
}
|
|
]
|
|
}`
|
|
|
|
log.Init("info", true)
|
|
tmpdir := t.TempDir()
|
|
jobarchive := filepath.Join(tmpdir, "job-archive")
|
|
if err := os.Mkdir(jobarchive, 0777); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
if err := os.WriteFile(filepath.Join(jobarchive, "version.txt"), []byte(fmt.Sprintf("%d", 2)), 0666); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
if err := os.Mkdir(filepath.Join(jobarchive, "testcluster"), 0777); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
if err := os.WriteFile(filepath.Join(jobarchive, "testcluster", "cluster.json"), []byte(testclusterJson), 0666); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
dbfilepath := filepath.Join(tmpdir, "test.db")
|
|
err := repository.MigrateDB("sqlite3", dbfilepath)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
cfgFilePath := filepath.Join(tmpdir, "config.json")
|
|
if err := os.WriteFile(cfgFilePath, []byte(testconfig), 0666); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
config.Init(cfgFilePath)
|
|
archiveCfg := fmt.Sprintf("{\"kind\": \"file\",\"path\": \"%s\"}", jobarchive)
|
|
|
|
repository.Connect("sqlite3", dbfilepath)
|
|
|
|
if err := archive.Init(json.RawMessage(archiveCfg), config.Keys.DisableArchive); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
if err := metricdata.Init(); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
archiver.Start(repository.GetJobRepository())
|
|
auth.Init()
|
|
graph.Init()
|
|
|
|
return api.New()
|
|
}
|
|
|
|
func cleanup() {
|
|
// TODO: Clear all caches, reset all modules, etc...
|
|
}
|
|
|
|
/*
|
|
* This function starts a job, stops it, and then reads its data from the job-archive.
|
|
* Do not run sub-tests in parallel! Tests should not be run in parallel at all, because
|
|
* at least `setup` modifies global state.
|
|
*/
|
|
func TestRestApi(t *testing.T) {
|
|
restapi := setup(t)
|
|
t.Cleanup(cleanup)
|
|
testData := schema.JobData{
|
|
"load_one": map[schema.MetricScope]*schema.JobMetric{
|
|
schema.MetricScopeNode: {
|
|
Unit: schema.Unit{Base: "load"},
|
|
Timestep: 60,
|
|
Series: []schema.Series{
|
|
{
|
|
Hostname: "host123",
|
|
Statistics: schema.MetricStatistics{Min: 0.1, Avg: 0.2, Max: 0.3},
|
|
Data: []schema.Float{0.1, 0.1, 0.1, 0.2, 0.2, 0.2, 0.3, 0.3, 0.3},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
}
|
|
|
|
metricdata.TestLoadDataCallback = func(job *schema.Job, metrics []string, scopes []schema.MetricScope, ctx context.Context, resolution int) (schema.JobData, error) {
|
|
return testData, nil
|
|
}
|
|
|
|
r := mux.NewRouter()
|
|
r.PathPrefix("/api").Subrouter()
|
|
r.StrictSlash(true)
|
|
restapi.MountApiRoutes(r)
|
|
|
|
var TestJobId int64 = 123
|
|
var TestClusterName string = "testcluster"
|
|
var TestStartTime int64 = 123456789
|
|
|
|
const startJobBody string = `{
|
|
"jobId": 123,
|
|
"user": "testuser",
|
|
"project": "testproj",
|
|
"cluster": "testcluster",
|
|
"partition": "default",
|
|
"walltime": 3600,
|
|
"arrayJobId": 0,
|
|
"numNodes": 1,
|
|
"numHwthreads": 8,
|
|
"numAcc": 0,
|
|
"exclusive": 1,
|
|
"monitoringStatus": 1,
|
|
"smt": 1,
|
|
"tags": [{ "type": "testTagType", "name": "testTagName", "scope": "testuser" }],
|
|
"resources": [
|
|
{
|
|
"hostname": "host123",
|
|
"hwthreads": [0, 1, 2, 3, 4, 5, 6, 7]
|
|
}
|
|
],
|
|
"metaData": { "jobScript": "blablabla..." },
|
|
"startTime": 123456789
|
|
}`
|
|
|
|
const contextUserKey repository.ContextKey = "user"
|
|
contextUserValue := &schema.User{
|
|
Username: "testuser",
|
|
Projects: make([]string, 0),
|
|
Roles: []string{"user"},
|
|
AuthType: 0,
|
|
AuthSource: 2,
|
|
}
|
|
|
|
if ok := t.Run("StartJob", func(t *testing.T) {
|
|
req := httptest.NewRequest(http.MethodPost, "/jobs/start_job/", bytes.NewBuffer([]byte(startJobBody)))
|
|
recorder := httptest.NewRecorder()
|
|
|
|
ctx := context.WithValue(req.Context(), contextUserKey, contextUserValue)
|
|
|
|
r.ServeHTTP(recorder, req.WithContext(ctx))
|
|
response := recorder.Result()
|
|
if response.StatusCode != http.StatusCreated {
|
|
t.Fatal(response.Status, recorder.Body.String())
|
|
}
|
|
resolver := graph.GetResolverInstance()
|
|
job, err := restapi.JobRepository.Find(&TestJobId, &TestClusterName, &TestStartTime)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
job.Tags, err = resolver.Job().Tags(ctx, job)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
if job.JobID != 123 ||
|
|
job.User != "testuser" ||
|
|
job.Project != "testproj" ||
|
|
job.Cluster != "testcluster" ||
|
|
job.SubCluster != "sc1" ||
|
|
job.Partition != "default" ||
|
|
job.Walltime != 3600 ||
|
|
job.ArrayJobId != 0 ||
|
|
job.NumNodes != 1 ||
|
|
job.NumHWThreads != 8 ||
|
|
job.NumAcc != 0 ||
|
|
job.Exclusive != 1 ||
|
|
job.MonitoringStatus != 1 ||
|
|
job.SMT != 1 ||
|
|
!reflect.DeepEqual(job.Resources, []*schema.Resource{{Hostname: "host123", HWThreads: []int{0, 1, 2, 3, 4, 5, 6, 7}}}) ||
|
|
job.StartTime.Unix() != 123456789 {
|
|
t.Fatalf("unexpected job properties: %#v", job)
|
|
}
|
|
|
|
if len(job.Tags) != 1 || job.Tags[0].Type != "testTagType" || job.Tags[0].Name != "testTagName" || job.Tags[0].Scope != "testuser" {
|
|
t.Fatalf("unexpected tags: %#v", job.Tags)
|
|
}
|
|
}); !ok {
|
|
return
|
|
}
|
|
|
|
const stopJobBody string = `{
|
|
"jobId": 123,
|
|
"startTime": 123456789,
|
|
"cluster": "testcluster",
|
|
|
|
"jobState": "completed",
|
|
"stopTime": 123457789
|
|
}`
|
|
|
|
var stoppedJob *schema.Job
|
|
if ok := t.Run("StopJob", func(t *testing.T) {
|
|
req := httptest.NewRequest(http.MethodPost, "/jobs/stop_job/", bytes.NewBuffer([]byte(stopJobBody)))
|
|
recorder := httptest.NewRecorder()
|
|
|
|
ctx := context.WithValue(req.Context(), contextUserKey, contextUserValue)
|
|
|
|
r.ServeHTTP(recorder, req.WithContext(ctx))
|
|
response := recorder.Result()
|
|
if response.StatusCode != http.StatusOK {
|
|
t.Fatal(response.Status, recorder.Body.String())
|
|
}
|
|
|
|
archiver.WaitForArchiving()
|
|
job, err := restapi.JobRepository.Find(&TestJobId, &TestClusterName, &TestStartTime)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
if job.State != schema.JobStateCompleted {
|
|
t.Fatal("expected job to be completed")
|
|
}
|
|
|
|
if job.Duration != (123457789 - 123456789) {
|
|
t.Fatalf("unexpected job properties: %#v", job)
|
|
}
|
|
|
|
job.MetaData, err = restapi.JobRepository.FetchMetadata(job)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
if !reflect.DeepEqual(job.MetaData, map[string]string{"jobScript": "blablabla..."}) {
|
|
t.Fatalf("unexpected job.metaData: %#v", job.MetaData)
|
|
}
|
|
|
|
stoppedJob = job
|
|
}); !ok {
|
|
return
|
|
}
|
|
|
|
t.Run("CheckArchive", func(t *testing.T) {
|
|
data, err := metricDataDispatcher.LoadData(stoppedJob, []string{"load_one"}, []schema.MetricScope{schema.MetricScopeNode}, context.Background(), 60)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
if !reflect.DeepEqual(data, testData) {
|
|
t.Fatal("unexpected data fetched from archive")
|
|
}
|
|
})
|
|
|
|
t.Run("CheckDoubleStart", func(t *testing.T) {
|
|
// Starting a job with the same jobId and cluster should only be allowed if the startTime is far appart!
|
|
body := strings.Replace(startJobBody, `"startTime": 123456789`, `"startTime": 123456790`, -1)
|
|
|
|
req := httptest.NewRequest(http.MethodPost, "/jobs/start_job/", bytes.NewBuffer([]byte(body)))
|
|
recorder := httptest.NewRecorder()
|
|
|
|
ctx := context.WithValue(req.Context(), contextUserKey, contextUserValue)
|
|
|
|
r.ServeHTTP(recorder, req.WithContext(ctx))
|
|
response := recorder.Result()
|
|
if response.StatusCode != http.StatusUnprocessableEntity {
|
|
t.Fatal(response.Status, recorder.Body.String())
|
|
}
|
|
})
|
|
|
|
const startJobBodyFailed string = `{
|
|
"jobId": 12345,
|
|
"user": "testuser",
|
|
"project": "testproj",
|
|
"cluster": "testcluster",
|
|
"partition": "default",
|
|
"walltime": 3600,
|
|
"numNodes": 1,
|
|
"exclusive": 1,
|
|
"monitoringStatus": 1,
|
|
"smt": 1,
|
|
"resources": [
|
|
{
|
|
"hostname": "host123"
|
|
}
|
|
],
|
|
"startTime": 12345678
|
|
}`
|
|
|
|
ok := t.Run("StartJobFailed", func(t *testing.T) {
|
|
req := httptest.NewRequest(http.MethodPost, "/jobs/start_job/", bytes.NewBuffer([]byte(startJobBodyFailed)))
|
|
recorder := httptest.NewRecorder()
|
|
|
|
ctx := context.WithValue(req.Context(), contextUserKey, contextUserValue)
|
|
|
|
r.ServeHTTP(recorder, req.WithContext(ctx))
|
|
response := recorder.Result()
|
|
if response.StatusCode != http.StatusCreated {
|
|
t.Fatal(response.Status, recorder.Body.String())
|
|
}
|
|
})
|
|
if !ok {
|
|
t.Fatal("subtest failed")
|
|
}
|
|
|
|
time.Sleep(1 * time.Second)
|
|
|
|
const stopJobBodyFailed string = `{
|
|
"jobId": 12345,
|
|
"cluster": "testcluster",
|
|
|
|
"jobState": "failed",
|
|
"stopTime": 12355678
|
|
}`
|
|
|
|
ok = t.Run("StopJobFailed", func(t *testing.T) {
|
|
req := httptest.NewRequest(http.MethodPost, "/jobs/stop_job/", bytes.NewBuffer([]byte(stopJobBodyFailed)))
|
|
recorder := httptest.NewRecorder()
|
|
|
|
ctx := context.WithValue(req.Context(), contextUserKey, contextUserValue)
|
|
|
|
r.ServeHTTP(recorder, req.WithContext(ctx))
|
|
response := recorder.Result()
|
|
if response.StatusCode != http.StatusOK {
|
|
t.Fatal(response.Status, recorder.Body.String())
|
|
}
|
|
|
|
archiver.WaitForArchiving()
|
|
jobid, cluster := int64(12345), "testcluster"
|
|
job, err := restapi.JobRepository.Find(&jobid, &cluster, nil)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
if job.State != schema.JobStateFailed {
|
|
t.Fatal("expected job to be failed")
|
|
}
|
|
})
|
|
if !ok {
|
|
t.Fatal("subtest failed")
|
|
}
|
|
}
|