Make metaData a map[string]string; Resolve explicitly

This commit is contained in:
Lou Knauer 2022-03-08 11:53:24 +01:00
parent 08d760361d
commit 9535c11dc4
10 changed files with 89 additions and 33 deletions

View File

@ -285,12 +285,6 @@ func (api *RestApi) startJob(rw http.ResponseWriter, r *http.Request) {
return return
} }
req.RawResources, err = json.Marshal(req.Resources)
if err != nil {
handleError(fmt.Errorf("basically impossible: %w", err), http.StatusBadRequest, rw)
return
}
id, err := api.JobRepository.Start(&req) id, err := api.JobRepository.Start(&req)
if err != nil { if err != nil {
handleError(fmt.Errorf("insert into database failed: %w", err), http.StatusInternalServerError, rw) handleError(fmt.Errorf("insert into database failed: %w", err), http.StatusInternalServerError, rw)

View File

@ -177,7 +177,7 @@ func TestRestApi(t *testing.T) {
"hwthreads": [0, 1, 2, 3, 4, 5, 6, 7] "hwthreads": [0, 1, 2, 3, 4, 5, 6, 7]
} }
], ],
"metaData": null, "metaData": { "jobScript": "blablabla..." },
"startTime": 123456789 "startTime": 123456789
}` }`
@ -260,7 +260,6 @@ func TestRestApi(t *testing.T) {
} }
if job.State != schema.JobStateCompleted { if job.State != schema.JobStateCompleted {
print("STATE:" + job.State)
t.Fatal("expected job to be completed") t.Fatal("expected job to be completed")
} }
@ -268,6 +267,15 @@ func TestRestApi(t *testing.T) {
t.Fatalf("unexpected job properties: %#v", job) t.Fatalf("unexpected job properties: %#v", job)
} }
job.MetaData, err = restapi.JobRepository.FetchMetadata(job)
if err != nil {
t.Fatal(err)
}
if !reflect.DeepEqual(job.MetaData, map[string]string{"jobScript": "blablabla..."}) {
t.Fatalf("unexpected job.metaData: %#v", job.MetaData)
}
stoppedJob = job stoppedJob = job
}); !ok { }); !ok {
return return

@ -1 +1 @@
Subproject commit b7e422e49ece2d9eeef7354ee595c54c474a4697 Subproject commit fdf89ac36ff7eb12fabd483ad6e50d55cc118b8b

View File

@ -59,6 +59,8 @@ models:
fields: fields:
tags: tags:
resolver: true resolver: true
metaData:
resolver: true
NullableFloat: { model: "github.com/ClusterCockpit/cc-backend/schema.Float" } NullableFloat: { model: "github.com/ClusterCockpit/cc-backend/schema.Float" }
MetricScope: { model: "github.com/ClusterCockpit/cc-backend/schema.MetricScope" } MetricScope: { model: "github.com/ClusterCockpit/cc-backend/schema.MetricScope" }
JobStatistics: { model: "github.com/ClusterCockpit/cc-backend/schema.JobStatistics" } JobStatistics: { model: "github.com/ClusterCockpit/cc-backend/schema.JobStatistics" }

View File

@ -234,6 +234,7 @@ type ComplexityRoot struct {
} }
type JobResolver interface { type JobResolver interface {
MetaData(ctx context.Context, obj *schema.Job) (interface{}, error)
Tags(ctx context.Context, obj *schema.Job) ([]*schema.Tag, error) Tags(ctx context.Context, obj *schema.Job) ([]*schema.Tag, error)
} }
type MutationResolver interface { type MutationResolver interface {
@ -3043,14 +3044,14 @@ func (ec *executionContext) _Job_metaData(ctx context.Context, field graphql.Col
Object: "Job", Object: "Job",
Field: field, Field: field,
Args: nil, Args: nil,
IsMethod: false, IsMethod: true,
IsResolver: false, IsResolver: true,
} }
ctx = graphql.WithFieldContext(ctx, fc) ctx = graphql.WithFieldContext(ctx, fc)
resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) { resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) {
ctx = rctx // use context from middleware stack in children ctx = rctx // use context from middleware stack in children
return obj.MetaData, nil return ec.resolvers.Job().MetaData(rctx, obj)
}) })
if err != nil { if err != nil {
ec.Error(ctx, err) ec.Error(ctx, err)
@ -7750,7 +7751,16 @@ func (ec *executionContext) _Job(ctx context.Context, sel ast.SelectionSet, obj
atomic.AddUint32(&invalids, 1) atomic.AddUint32(&invalids, 1)
} }
case "metaData": case "metaData":
out.Values[i] = ec._Job_metaData(ctx, field, obj) field := field
out.Concurrently(i, func() (res graphql.Marshaler) {
defer func() {
if r := recover(); r != nil {
ec.Error(ctx, ec.Recover(ctx, r))
}
}()
res = ec._Job_metaData(ctx, field, obj)
return res
})
case "tags": case "tags":
field := field field := field
out.Concurrently(i, func() (res graphql.Marshaler) { out.Concurrently(i, func() (res graphql.Marshaler) {

View File

@ -18,6 +18,10 @@ import (
"github.com/ClusterCockpit/cc-backend/schema" "github.com/ClusterCockpit/cc-backend/schema"
) )
func (r *jobResolver) MetaData(ctx context.Context, obj *schema.Job) (interface{}, error) {
return r.Repo.FetchMetadata(obj)
}
func (r *jobResolver) Tags(ctx context.Context, obj *schema.Job) ([]*schema.Tag, error) { func (r *jobResolver) Tags(ctx context.Context, obj *schema.Job) ([]*schema.Tag, error) {
return r.Repo.GetTags(&obj.ID) return r.Repo.GetTags(&obj.ID)
} }

View File

@ -219,6 +219,11 @@ func loadJob(tx *sqlx.Tx, stmt *sqlx.NamedStmt, tags map[string]int64, path stri
return err return err
} }
job.RawMetaData, err = json.Marshal(job.MetaData)
if err != nil {
return err
}
if err := repository.SanityChecks(&job.BaseJob); err != nil { if err := repository.SanityChecks(&job.BaseJob); err != nil {
return err return err
} }

View File

@ -93,6 +93,10 @@ func (r *JobRepository) ImportJob(jobMeta *schema.JobMeta, jobData *schema.JobDa
if err != nil { if err != nil {
return err return err
} }
job.RawMetaData, err = json.Marshal(job.MetaData)
if err != nil {
return err
}
if err := SanityChecks(&job.BaseJob); err != nil { if err := SanityChecks(&job.BaseJob); err != nil {
return err return err

View File

@ -5,6 +5,7 @@ import (
"database/sql" "database/sql"
"encoding/json" "encoding/json"
"errors" "errors"
"fmt"
"strconv" "strconv"
"time" "time"
@ -29,7 +30,7 @@ func (r *JobRepository) Init() error {
var jobColumns []string = []string{ var jobColumns []string = []string{
"job.id", "job.job_id", "job.user", "job.project", "job.cluster", "job.start_time", "job.partition", "job.array_job_id", "job.id", "job.job_id", "job.user", "job.project", "job.cluster", "job.start_time", "job.partition", "job.array_job_id",
"job.num_nodes", "job.num_hwthreads", "job.num_acc", "job.exclusive", "job.monitoring_status", "job.smt", "job.job_state", "job.num_nodes", "job.num_hwthreads", "job.num_acc", "job.exclusive", "job.monitoring_status", "job.smt", "job.job_state",
"job.duration", "job.resources", "job.meta_data", "job.duration", "job.resources", // "job.meta_data",
} }
func scanJob(row interface{ Scan(...interface{}) error }) (*schema.Job, error) { func scanJob(row interface{ Scan(...interface{}) error }) (*schema.Job, error) {
@ -37,7 +38,7 @@ func scanJob(row interface{ Scan(...interface{}) error }) (*schema.Job, error) {
if err := row.Scan( if err := row.Scan(
&job.ID, &job.JobID, &job.User, &job.Project, &job.Cluster, &job.StartTimeUnix, &job.Partition, &job.ArrayJobId, &job.ID, &job.JobID, &job.User, &job.Project, &job.Cluster, &job.StartTimeUnix, &job.Partition, &job.ArrayJobId,
&job.NumNodes, &job.NumHWThreads, &job.NumAcc, &job.Exclusive, &job.MonitoringStatus, &job.SMT, &job.State, &job.NumNodes, &job.NumHWThreads, &job.NumAcc, &job.Exclusive, &job.MonitoringStatus, &job.SMT, &job.State,
&job.Duration, &job.RawResources, &job.MetaData); err != nil { &job.Duration, &job.RawResources /*&job.MetaData*/); err != nil {
return nil, err return nil, err
} }
@ -54,6 +55,23 @@ func scanJob(row interface{ Scan(...interface{}) error }) (*schema.Job, error) {
return job, nil return job, nil
} }
func (r *JobRepository) FetchMetadata(job *schema.Job) (map[string]string, error) {
if err := sq.Select("job.meta_data").From("job").Where("job.id = ?", job.ID).
RunWith(r.stmtCache).QueryRow().Scan(&job.RawMetaData); err != nil {
return nil, err
}
if len(job.RawMetaData) == 0 {
return nil, nil
}
if err := json.Unmarshal(job.RawMetaData, &job.MetaData); err != nil {
return nil, err
}
return job.MetaData, nil
}
// Find executes a SQL query to find a specific batch job. // Find executes a SQL query to find a specific batch job.
// The job is queried using the batch job id, the cluster name, // The job is queried using the batch job id, the cluster name,
// and the start time of the job in UNIX epoch time seconds. // and the start time of the job in UNIX epoch time seconds.
@ -91,6 +109,16 @@ func (r *JobRepository) FindById(
// Start inserts a new job in the table, returning the unique job ID. // Start inserts a new job in the table, returning the unique job ID.
// Statistics are not transfered! // Statistics are not transfered!
func (r *JobRepository) Start(job *schema.JobMeta) (id int64, err error) { func (r *JobRepository) Start(job *schema.JobMeta) (id int64, err error) {
job.RawResources, err = json.Marshal(job.Resources)
if err != nil {
return -1, fmt.Errorf("encoding resources field failed: %w", err)
}
job.RawMetaData, err = json.Marshal(job.MetaData)
if err != nil {
return -1, fmt.Errorf("encoding metaData field failed: %w", err)
}
res, err := r.DB.NamedExec(`INSERT INTO job ( res, err := r.DB.NamedExec(`INSERT INTO job (
job_id, user, project, cluster, `+"`partition`"+`, array_job_id, num_nodes, num_hwthreads, num_acc, job_id, user, project, cluster, `+"`partition`"+`, array_job_id, num_nodes, num_hwthreads, num_acc,
exclusive, monitoring_status, smt, job_state, start_time, duration, resources, meta_data exclusive, monitoring_status, smt, job_state, start_time, duration, resources, meta_data

View File

@ -27,7 +27,8 @@ type BaseJob struct {
Tags []*Tag `json:"tags"` Tags []*Tag `json:"tags"`
RawResources []byte `json:"-" db:"resources"` RawResources []byte `json:"-" db:"resources"`
Resources []*Resource `json:"resources"` Resources []*Resource `json:"resources"`
MetaData interface{} `json:"metaData" db:"meta_data"` RawMetaData []byte `json:"-" db:"meta_data"`
MetaData map[string]string `json:"metaData"`
} }
// This type is used as the GraphQL interface and using sqlx as a table row. // This type is used as the GraphQL interface and using sqlx as a table row.