mirror of
https://github.com/ClusterCockpit/cc-backend
synced 2025-07-30 16:16:07 +02:00
Compare commits
5 Commits
v1.4.4
...
37-provide
Author | SHA1 | Date | |
---|---|---|---|
03a496d477 | |||
d6fd265518 | |||
3bb5ae696a | |||
578a270932 | |||
77d550ca4b |
9
go.mod
9
go.mod
@@ -18,6 +18,7 @@ require (
|
||||
github.com/influxdata/influxdb-client-go/v2 v2.12.2
|
||||
github.com/jmoiron/sqlx v1.3.5
|
||||
github.com/mattn/go-sqlite3 v1.14.16
|
||||
github.com/minio/minio-go/v7 v7.0.63
|
||||
github.com/prometheus/client_golang v1.14.0
|
||||
github.com/prometheus/common v0.40.0
|
||||
github.com/qustavo/sqlhooks/v2 v2.1.0
|
||||
@@ -39,6 +40,7 @@ require (
|
||||
github.com/containerd/containerd v1.6.18 // indirect
|
||||
github.com/cpuguy83/go-md2man/v2 v2.0.3 // indirect
|
||||
github.com/deepmap/oapi-codegen v1.12.4 // indirect
|
||||
github.com/dustin/go-humanize v1.0.1 // indirect
|
||||
github.com/felixge/httpsnoop v1.0.3 // indirect
|
||||
github.com/go-asn1-ber/asn1-ber v1.5.4 // indirect
|
||||
github.com/go-openapi/jsonpointer v0.20.0 // indirect
|
||||
@@ -56,10 +58,14 @@ require (
|
||||
github.com/josharian/intern v1.0.0 // indirect
|
||||
github.com/jpillora/backoff v1.0.0 // indirect
|
||||
github.com/json-iterator/go v1.1.12 // indirect
|
||||
github.com/klauspost/compress v1.16.7 // indirect
|
||||
github.com/klauspost/cpuid/v2 v2.2.5 // indirect
|
||||
github.com/lann/builder v0.0.0-20180802200727-47ae307949d0 // indirect
|
||||
github.com/lann/ps v0.0.0-20150810152359-62de8c46ede0 // indirect
|
||||
github.com/mailru/easyjson v0.7.7 // indirect
|
||||
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
|
||||
github.com/minio/md5-simd v1.1.2 // indirect
|
||||
github.com/minio/sha256-simd v1.0.1 // indirect
|
||||
github.com/mitchellh/mapstructure v1.5.0 // indirect
|
||||
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
|
||||
github.com/modern-go/reflect2 v1.0.2 // indirect
|
||||
@@ -69,7 +75,9 @@ require (
|
||||
github.com/prometheus/client_model v0.3.0 // indirect
|
||||
github.com/prometheus/procfs v0.9.0 // indirect
|
||||
github.com/robfig/cron/v3 v3.0.1 // indirect
|
||||
github.com/rs/xid v1.5.0 // indirect
|
||||
github.com/russross/blackfriday/v2 v2.1.0 // indirect
|
||||
github.com/sirupsen/logrus v1.9.3 // indirect
|
||||
github.com/sosodev/duration v1.2.0 // indirect
|
||||
github.com/swaggo/files v1.0.0 // indirect
|
||||
github.com/urfave/cli/v2 v2.25.7 // indirect
|
||||
@@ -83,6 +91,7 @@ require (
|
||||
golang.org/x/tools v0.16.0 // indirect
|
||||
google.golang.org/appengine v1.6.7 // indirect
|
||||
google.golang.org/protobuf v1.30.0 // indirect
|
||||
gopkg.in/ini.v1 v1.67.0 // indirect
|
||||
gopkg.in/yaml.v2 v2.4.0 // indirect
|
||||
gopkg.in/yaml.v3 v3.0.1 // indirect
|
||||
sigs.k8s.io/yaml v1.4.0 // indirect
|
||||
|
21
go.sum
21
go.sum
@@ -402,6 +402,8 @@ github.com/docker/spdystream v0.0.0-20160310174837-449fdfce4d96/go.mod h1:Qh8CwZ
|
||||
github.com/docopt/docopt-go v0.0.0-20180111231733-ee0de3bc6815/go.mod h1:WwZ+bS3ebgob9U8Nd0kOddGdZWjyMGR8Wziv+TBNwSE=
|
||||
github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
|
||||
github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
|
||||
github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY=
|
||||
github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto=
|
||||
github.com/edsrzf/mmap-go v0.0.0-20170320065105-0bce6a688712/go.mod h1:YO35OhQPt3KJa3ryjFM5Bs14WD66h8eGKpfaBNrHW5M=
|
||||
github.com/elazarl/goproxy v0.0.0-20180725130230-947c36da3153/go.mod h1:/Zj4wYkgs4iZTTu3o/KG3Itv/qCCa8VVMlb3i9OVuzc=
|
||||
github.com/emicklei/go-restful v0.0.0-20170410110728-ff4f55a20633/go.mod h1:otzb+WCGbkyDHkqmQmT5YD2WR4BBwUdeQoFo8l/7tVs=
|
||||
@@ -817,6 +819,11 @@ github.com/klauspost/compress v1.11.13/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdY
|
||||
github.com/klauspost/compress v1.13.1/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg=
|
||||
github.com/klauspost/compress v1.13.4/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg=
|
||||
github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk=
|
||||
github.com/klauspost/compress v1.16.7 h1:2mk3MPGNzKyxErAw8YaohYh69+pa4sIQSC0fPGCFR9I=
|
||||
github.com/klauspost/compress v1.16.7/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
|
||||
github.com/klauspost/cpuid/v2 v2.0.1/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
|
||||
github.com/klauspost/cpuid/v2 v2.2.5 h1:0E5MSMDEoAulmXNFquVs//DdoomxaoTY1kUhbc/qbZg=
|
||||
github.com/klauspost/cpuid/v2 v2.2.5/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws=
|
||||
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
|
||||
github.com/konsorten/go-windows-terminal-sequences v1.0.2/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
|
||||
github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
|
||||
@@ -888,6 +895,12 @@ github.com/matttproud/golang_protobuf_extensions v1.0.4/go.mod h1:BSXmuO+STAnVfr
|
||||
github.com/maxbrunsfeld/counterfeiter/v6 v6.2.2/go.mod h1:eD9eIE7cdwcMi9rYluz88Jz2VyhSmden33/aXg4oVIY=
|
||||
github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg=
|
||||
github.com/miekg/pkcs11 v1.0.3/go.mod h1:XsNlhZGX73bx86s2hdc/FuaLm2CPZJemRLMA+WTFxgs=
|
||||
github.com/minio/md5-simd v1.1.2 h1:Gdi1DZK69+ZVMoNHRXJyNcxrMA4dSxoYHZSQbirFg34=
|
||||
github.com/minio/md5-simd v1.1.2/go.mod h1:MzdKDxYpY2BT9XQFocsiZf/NKVtR7nkE4RoEpN+20RM=
|
||||
github.com/minio/minio-go/v7 v7.0.63 h1:GbZ2oCvaUdgT5640WJOpyDhhDxvknAJU2/T3yurwcbQ=
|
||||
github.com/minio/minio-go/v7 v7.0.63/go.mod h1:Q6X7Qjb7WMhvG65qKf4gUgA5XaiSox74kR1uAEjxRS4=
|
||||
github.com/minio/sha256-simd v1.0.1 h1:6kaan5IFmwTNynnKKpDHe6FWHohJOHhCPchzK49dzMM=
|
||||
github.com/minio/sha256-simd v1.0.1/go.mod h1:Pz6AKMiUdngCLpeTL/RJY1M9rUuPMYujV5xJjtbRSN8=
|
||||
github.com/mistifyio/go-zfs v2.1.2-0.20190413222219-f784269be439+incompatible/go.mod h1:8AuVvqP/mXw1px98n46wfvcGfQ4ci2FwoAjKYxuo3Z4=
|
||||
github.com/mitchellh/cli v1.0.0/go.mod h1:hNIlj7HEI86fIcpObd7a0FcrxTWetlwJDGcceTlRvqc=
|
||||
github.com/mitchellh/go-homedir v1.0.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0=
|
||||
@@ -1067,6 +1080,8 @@ github.com/rogpeppe/go-internal v1.2.2/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFR
|
||||
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
|
||||
github.com/rogpeppe/go-internal v1.8.1 h1:geMPLpDpQOgVyCg5z5GoRwLHepNdb71NXb67XFkP+Eg=
|
||||
github.com/rs/xid v1.2.1/go.mod h1:+uKXf+4Djp6Md1KODXJxgGQPKngRmWyn10oCKFzNHOQ=
|
||||
github.com/rs/xid v1.5.0 h1:mKX4bl4iPYJtEIxp6CYiUuLQ/8DYMoz0PUdtGgMFRVc=
|
||||
github.com/rs/xid v1.5.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg=
|
||||
github.com/rs/zerolog v1.13.0/go.mod h1:YbFCdg8HfsridGWAh22vktObvhZbQsZXe4/zB0OKkWU=
|
||||
github.com/rs/zerolog v1.15.0/go.mod h1:xYTKnLHcpfU2225ny5qZjxnj9NvkumZYjJHlAThCjNc=
|
||||
github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
|
||||
@@ -1097,8 +1112,9 @@ github.com/sirupsen/logrus v1.4.1/go.mod h1:ni0Sbl8bgC9z8RoU9G6nDWqqs/fq4eDPysMB
|
||||
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
|
||||
github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrfsX/uA88=
|
||||
github.com/sirupsen/logrus v1.7.0/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0=
|
||||
github.com/sirupsen/logrus v1.8.1 h1:dJKuHgqk1NNQlqoA6BTlM1Wf9DOH3NBjQyu0h9+AZZE=
|
||||
github.com/sirupsen/logrus v1.8.1/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0=
|
||||
github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ=
|
||||
github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ=
|
||||
github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc=
|
||||
github.com/smartystreets/goconvey v0.0.0-20190330032615-68dc04aab96a/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA=
|
||||
github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA=
|
||||
@@ -1566,6 +1582,7 @@ golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBc
|
||||
golang.org/x/sys v0.0.0-20220111092808-5a964db01320/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.0.0-20220317061510-51cd9980dadf/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.2.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.15.0 h1:h48lPFYpsTvQJZF4EKyI4aLHaev3CxivZmv7yZig9pc=
|
||||
@@ -1870,6 +1887,8 @@ gopkg.in/gemnasium/logrus-airbrake-hook.v2 v2.1.2/go.mod h1:Xk6kEKp8OKb+X14hQBKW
|
||||
gopkg.in/inconshreveable/log15.v2 v2.0.0-20180818164646-67afb5ed74ec/go.mod h1:aPpfJ7XW+gOuirDoZ8gHhLh3kZ1B08FtV2bbmy7Jv3s=
|
||||
gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw=
|
||||
gopkg.in/ini.v1 v1.51.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=
|
||||
gopkg.in/ini.v1 v1.67.0 h1:Dgnx+6+nfE+IfzjUEISNeydPJh9AXNNsWbGP9KzCsOA=
|
||||
gopkg.in/ini.v1 v1.67.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=
|
||||
gopkg.in/natefinch/lumberjack.v2 v2.0.0/go.mod h1:l0ndWWf7gzL7RNwBG7wST/UCcT4T24xpD6X8LsfU/+k=
|
||||
gopkg.in/resty.v1 v1.12.0/go.mod h1:mDo4pnntr5jdWRML875a/NmxYqAlA73dVijT2AXvQQo=
|
||||
gopkg.in/square/go-jose.v2 v2.2.2/go.mod h1:M9dMgbHiYLoDGQrXy7OpJDJWiKiU//h+vD76mk0e1AI=
|
||||
|
@@ -5,9 +5,16 @@
|
||||
package archive
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"bytes"
|
||||
"compress/gzip"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
|
||||
"github.com/ClusterCockpit/cc-backend/internal/config"
|
||||
"github.com/ClusterCockpit/cc-backend/pkg/log"
|
||||
"github.com/ClusterCockpit/cc-backend/pkg/lrucache"
|
||||
"github.com/ClusterCockpit/cc-backend/pkg/schema"
|
||||
@@ -52,9 +59,69 @@ type JobContainer struct {
|
||||
Data *schema.JobData
|
||||
}
|
||||
|
||||
var cache *lrucache.Cache = lrucache.New(128 * 1024 * 1024)
|
||||
var ar ArchiveBackend
|
||||
var useArchive bool
|
||||
var (
|
||||
cache *lrucache.Cache = lrucache.New(128 * 1024 * 1024)
|
||||
ar ArchiveBackend
|
||||
useArchive bool
|
||||
)
|
||||
|
||||
func getPath(
|
||||
job *schema.Job,
|
||||
rootPath string,
|
||||
file string,
|
||||
) string {
|
||||
return filepath.Join(
|
||||
getDirectory(job, rootPath), file)
|
||||
}
|
||||
|
||||
func getDirectory(
|
||||
job *schema.Job,
|
||||
rootPath string,
|
||||
) string {
|
||||
lvl1, lvl2 := fmt.Sprintf("%d", job.JobID/1000), fmt.Sprintf("%03d", job.JobID%1000)
|
||||
|
||||
return filepath.Join(
|
||||
rootPath,
|
||||
job.Cluster,
|
||||
lvl1, lvl2,
|
||||
strconv.FormatInt(job.StartTime.Unix(), 10))
|
||||
}
|
||||
|
||||
func loadJobMeta(b []byte) (*schema.JobMeta, error) {
|
||||
if config.Keys.Validate {
|
||||
if err := schema.Validate(schema.Meta, bytes.NewReader(b)); err != nil {
|
||||
return &schema.JobMeta{}, fmt.Errorf("validate job meta: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
return DecodeJobMeta(bytes.NewReader(b))
|
||||
}
|
||||
|
||||
func loadJobData(f io.Reader, key string, isCompressed bool) (schema.JobData, error) {
|
||||
if isCompressed {
|
||||
r, err := gzip.NewReader(f)
|
||||
if err != nil {
|
||||
log.Errorf(" %v", err)
|
||||
return nil, err
|
||||
}
|
||||
defer r.Close()
|
||||
|
||||
if config.Keys.Validate {
|
||||
if err := schema.Validate(schema.Data, r); err != nil {
|
||||
return schema.JobData{}, fmt.Errorf("validate job data: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
return DecodeJobData(r, key)
|
||||
} else {
|
||||
if config.Keys.Validate {
|
||||
if err := schema.Validate(schema.Data, bufio.NewReader(f)); err != nil {
|
||||
return schema.JobData{}, fmt.Errorf("validate job data: %v", err)
|
||||
}
|
||||
}
|
||||
return DecodeJobData(bufio.NewReader(f), key)
|
||||
}
|
||||
}
|
||||
|
||||
func Init(rawConfig json.RawMessage, disableArchive bool) error {
|
||||
useArchive = !disableArchive
|
||||
@@ -95,8 +162,8 @@ func GetHandle() ArchiveBackend {
|
||||
func LoadAveragesFromArchive(
|
||||
job *schema.Job,
|
||||
metrics []string,
|
||||
data [][]schema.Float) error {
|
||||
|
||||
data [][]schema.Float,
|
||||
) error {
|
||||
metaFile, err := ar.LoadJobMeta(job)
|
||||
if err != nil {
|
||||
log.Warn("Error while loading job metadata from archiveBackend")
|
||||
@@ -115,7 +182,6 @@ func LoadAveragesFromArchive(
|
||||
}
|
||||
|
||||
func GetStatistics(job *schema.Job) (map[string]schema.JobStatistics, error) {
|
||||
|
||||
metaFile, err := ar.LoadJobMeta(job)
|
||||
if err != nil {
|
||||
log.Warn("Error while loading job metadata from archiveBackend")
|
||||
@@ -128,7 +194,6 @@ func GetStatistics(job *schema.Job) (map[string]schema.JobStatistics, error) {
|
||||
// If the job is archived, find its `meta.json` file and override the tags list
|
||||
// in that JSON file. If the job is not archived, nothing is done.
|
||||
func UpdateTags(job *schema.Job, tags []*schema.Tag) error {
|
||||
|
||||
if job.State == schema.JobStateRunning || !useArchive {
|
||||
return nil
|
||||
}
|
||||
|
@@ -5,9 +5,7 @@
|
||||
package archive
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"bytes"
|
||||
"compress/gzip"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
@@ -20,7 +18,6 @@ import (
|
||||
"text/tabwriter"
|
||||
"time"
|
||||
|
||||
"github.com/ClusterCockpit/cc-backend/internal/config"
|
||||
"github.com/ClusterCockpit/cc-backend/internal/util"
|
||||
"github.com/ClusterCockpit/cc-backend/pkg/log"
|
||||
"github.com/ClusterCockpit/cc-backend/pkg/schema"
|
||||
@@ -43,80 +40,7 @@ type clusterInfo struct {
|
||||
diskSize float64
|
||||
}
|
||||
|
||||
func getDirectory(
|
||||
job *schema.Job,
|
||||
rootPath string,
|
||||
) string {
|
||||
lvl1, lvl2 := fmt.Sprintf("%d", job.JobID/1000), fmt.Sprintf("%03d", job.JobID%1000)
|
||||
|
||||
return filepath.Join(
|
||||
rootPath,
|
||||
job.Cluster,
|
||||
lvl1, lvl2,
|
||||
strconv.FormatInt(job.StartTime.Unix(), 10))
|
||||
}
|
||||
|
||||
func getPath(
|
||||
job *schema.Job,
|
||||
rootPath string,
|
||||
file string) string {
|
||||
|
||||
return filepath.Join(
|
||||
getDirectory(job, rootPath), file)
|
||||
}
|
||||
|
||||
func loadJobMeta(filename string) (*schema.JobMeta, error) {
|
||||
|
||||
b, err := os.ReadFile(filename)
|
||||
if err != nil {
|
||||
log.Errorf("loadJobMeta() > open file error: %v", err)
|
||||
return &schema.JobMeta{}, err
|
||||
}
|
||||
if config.Keys.Validate {
|
||||
if err := schema.Validate(schema.Meta, bytes.NewReader(b)); err != nil {
|
||||
return &schema.JobMeta{}, fmt.Errorf("validate job meta: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
return DecodeJobMeta(bytes.NewReader(b))
|
||||
}
|
||||
|
||||
func loadJobData(filename string, isCompressed bool) (schema.JobData, error) {
|
||||
f, err := os.Open(filename)
|
||||
|
||||
if err != nil {
|
||||
log.Errorf("fsBackend LoadJobData()- %v", err)
|
||||
return nil, err
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
if isCompressed {
|
||||
r, err := gzip.NewReader(f)
|
||||
if err != nil {
|
||||
log.Errorf(" %v", err)
|
||||
return nil, err
|
||||
}
|
||||
defer r.Close()
|
||||
|
||||
if config.Keys.Validate {
|
||||
if err := schema.Validate(schema.Data, r); err != nil {
|
||||
return schema.JobData{}, fmt.Errorf("validate job data: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
return DecodeJobData(r, filename)
|
||||
} else {
|
||||
if config.Keys.Validate {
|
||||
if err := schema.Validate(schema.Data, bufio.NewReader(f)); err != nil {
|
||||
return schema.JobData{}, fmt.Errorf("validate job data: %v", err)
|
||||
}
|
||||
}
|
||||
return DecodeJobData(bufio.NewReader(f), filename)
|
||||
}
|
||||
}
|
||||
|
||||
func (fsa *FsArchive) Init(rawConfig json.RawMessage) (uint64, error) {
|
||||
|
||||
var config FsArchiveConfig
|
||||
if err := json.Unmarshal(rawConfig, &config); err != nil {
|
||||
log.Warnf("Init() > Unmarshal error: %#v", err)
|
||||
@@ -242,7 +166,6 @@ func (fsa *FsArchive) Exists(job *schema.Job) bool {
|
||||
}
|
||||
|
||||
func (fsa *FsArchive) Clean(before int64, after int64) {
|
||||
|
||||
if after == 0 {
|
||||
after = math.MaxInt64
|
||||
}
|
||||
@@ -358,7 +281,6 @@ func (fsa *FsArchive) Compress(jobs []*schema.Job) {
|
||||
}
|
||||
|
||||
func (fsa *FsArchive) CompressLast(starttime int64) int64 {
|
||||
|
||||
filename := filepath.Join(fsa.path, "compress.txt")
|
||||
b, err := os.ReadFile(filename)
|
||||
if err != nil {
|
||||
@@ -378,7 +300,7 @@ func (fsa *FsArchive) CompressLast(starttime int64) int64 {
|
||||
}
|
||||
|
||||
func (fsa *FsArchive) LoadJobData(job *schema.Job) (schema.JobData, error) {
|
||||
var isCompressed bool = true
|
||||
isCompressed := true
|
||||
filename := getPath(job, fsa.path, "data.json.gz")
|
||||
|
||||
if !util.CheckFileExists(filename) {
|
||||
@@ -386,31 +308,38 @@ func (fsa *FsArchive) LoadJobData(job *schema.Job) (schema.JobData, error) {
|
||||
isCompressed = false
|
||||
}
|
||||
|
||||
return loadJobData(filename, isCompressed)
|
||||
f, err := os.Open(filename)
|
||||
if err != nil {
|
||||
log.Errorf("fsBackend LoadJobData()- %v", err)
|
||||
return nil, err
|
||||
}
|
||||
defer f.Close()
|
||||
return loadJobData(f, filename, isCompressed)
|
||||
}
|
||||
|
||||
func (fsa *FsArchive) LoadJobMeta(job *schema.Job) (*schema.JobMeta, error) {
|
||||
filename := getPath(job, fsa.path, "meta.json")
|
||||
return loadJobMeta(filename)
|
||||
b, err := os.ReadFile(filename)
|
||||
if err != nil {
|
||||
log.Errorf("loadJobMeta() > open file error: %v", err)
|
||||
return &schema.JobMeta{}, err
|
||||
}
|
||||
return loadJobMeta(b)
|
||||
}
|
||||
|
||||
func (fsa *FsArchive) LoadClusterCfg(name string) (*schema.Cluster, error) {
|
||||
|
||||
b, err := os.ReadFile(filepath.Join(fsa.path, name, "cluster.json"))
|
||||
if err != nil {
|
||||
log.Errorf("LoadClusterCfg() > open file error: %v", err)
|
||||
// if config.Keys.Validate {
|
||||
if err := schema.Validate(schema.ClusterCfg, bytes.NewReader(b)); err != nil {
|
||||
log.Warnf("Validate cluster config: %v\n", err)
|
||||
return &schema.Cluster{}, fmt.Errorf("validate cluster config: %v", err)
|
||||
}
|
||||
}
|
||||
// }
|
||||
return DecodeCluster(bytes.NewReader(b))
|
||||
}
|
||||
|
||||
func (fsa *FsArchive) Iter(loadMetricData bool) <-chan JobContainer {
|
||||
|
||||
ch := make(chan JobContainer)
|
||||
go func() {
|
||||
clustersDir, err := os.ReadDir(fsa.path)
|
||||
@@ -447,7 +376,11 @@ func (fsa *FsArchive) Iter(loadMetricData bool) <-chan JobContainer {
|
||||
|
||||
for _, startTimeDir := range startTimeDirs {
|
||||
if startTimeDir.IsDir() {
|
||||
job, err := loadJobMeta(filepath.Join(dirpath, startTimeDir.Name(), "meta.json"))
|
||||
b, err := os.ReadFile(filepath.Join(dirpath, startTimeDir.Name(), "meta.json"))
|
||||
if err != nil {
|
||||
log.Errorf("loadJobMeta() > open file error: %v", err)
|
||||
}
|
||||
job, err := loadJobMeta(b)
|
||||
if err != nil && !errors.Is(err, &jsonschema.ValidationError{}) {
|
||||
log.Errorf("in %s: %s", filepath.Join(dirpath, startTimeDir.Name()), err.Error())
|
||||
}
|
||||
@@ -461,7 +394,13 @@ func (fsa *FsArchive) Iter(loadMetricData bool) <-chan JobContainer {
|
||||
isCompressed = false
|
||||
}
|
||||
|
||||
data, err := loadJobData(filename, isCompressed)
|
||||
f, err := os.Open(filename)
|
||||
if err != nil {
|
||||
log.Errorf("fsBackend LoadJobData()- %v", err)
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
data, err := loadJobData(f, filename, isCompressed)
|
||||
if err != nil && !errors.Is(err, &jsonschema.ValidationError{}) {
|
||||
log.Errorf("in %s: %s", filepath.Join(dirpath, startTimeDir.Name()), err.Error())
|
||||
}
|
||||
@@ -481,7 +420,6 @@ func (fsa *FsArchive) Iter(loadMetricData bool) <-chan JobContainer {
|
||||
}
|
||||
|
||||
func (fsa *FsArchive) StoreJobMeta(jobMeta *schema.JobMeta) error {
|
||||
|
||||
job := schema.Job{
|
||||
BaseJob: jobMeta.BaseJob,
|
||||
StartTime: time.Unix(jobMeta.StartTime, 0),
|
||||
@@ -510,8 +448,8 @@ func (fsa *FsArchive) GetClusters() []string {
|
||||
|
||||
func (fsa *FsArchive) ImportJob(
|
||||
jobMeta *schema.JobMeta,
|
||||
jobData *schema.JobData) error {
|
||||
|
||||
jobData *schema.JobData,
|
||||
) error {
|
||||
job := schema.Job{
|
||||
BaseJob: jobMeta.BaseJob,
|
||||
StartTime: time.Unix(jobMeta.StartTime, 0),
|
||||
|
@@ -7,6 +7,7 @@ package archive
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
"time"
|
||||
@@ -30,6 +31,7 @@ func TestInitNoJson(t *testing.T) {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestInitNotExists(t *testing.T) {
|
||||
var fsa FsArchive
|
||||
_, err := fsa.Init(json.RawMessage("{\"path\":\"testdata/job-archive\"}"))
|
||||
@@ -61,8 +63,12 @@ func TestLoadJobMetaInternal(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
b, err := os.ReadFile("testdata/archive/emmy/1404/397/1609300556/meta.json")
|
||||
if err != nil {
|
||||
t.Fatalf("loadJobMeta() > open file error: %v", err)
|
||||
}
|
||||
|
||||
job, err := loadJobMeta("testdata/archive/emmy/1404/397/1609300556/meta.json")
|
||||
job, err := loadJobMeta(b)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@@ -133,7 +139,6 @@ func TestLoadJobData(t *testing.T) {
|
||||
}
|
||||
|
||||
func BenchmarkLoadJobData(b *testing.B) {
|
||||
|
||||
tmpdir := b.TempDir()
|
||||
jobarchive := filepath.Join(tmpdir, "job-archive")
|
||||
util.CopyDir("./testdata/archive/", jobarchive)
|
||||
@@ -157,7 +162,6 @@ func BenchmarkLoadJobData(b *testing.B) {
|
||||
}
|
||||
|
||||
func BenchmarkLoadJobDataCompressed(b *testing.B) {
|
||||
|
||||
tmpdir := b.TempDir()
|
||||
jobarchive := filepath.Join(tmpdir, "job-archive")
|
||||
util.CopyDir("./testdata/archive/", jobarchive)
|
||||
|
@@ -1,13 +1,318 @@
|
||||
// Copyright (C) 2022 NHR@FAU, University Erlangen-Nuremberg.
|
||||
// Copyright (C) 2023 NHR@FAU, University Erlangen-Nuremberg.
|
||||
// All rights reserved.
|
||||
// Use of this source code is governed by a MIT-style
|
||||
// license that can be found in the LICENSE file.
|
||||
package archive
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
"unsafe"
|
||||
|
||||
"github.com/ClusterCockpit/cc-backend/pkg/log"
|
||||
"github.com/ClusterCockpit/cc-backend/pkg/schema"
|
||||
"github.com/minio/minio-go/v7"
|
||||
"github.com/minio/minio-go/v7/pkg/credentials"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
type S3ArchiveConfig struct {
|
||||
Path string `json:"filePath"`
|
||||
Endpoint string `json:"endpoint"`
|
||||
AccessKeyID string `json:"accessKeyID"`
|
||||
SecretAccessKey string `json:"secretAccessKey"`
|
||||
Bucket string `json:"bucket"`
|
||||
UseSSL bool `json:"useSSL"`
|
||||
}
|
||||
|
||||
type S3Archive struct {
|
||||
path string
|
||||
client *minio.Client
|
||||
bucket string
|
||||
clusters []string
|
||||
}
|
||||
|
||||
func (s3a *S3Archive) stat(object string) (*minio.ObjectInfo, error) {
|
||||
objectStat, e := s3a.client.StatObject(context.Background(),
|
||||
s3a.bucket,
|
||||
object, minio.GetObjectOptions{})
|
||||
|
||||
if e != nil {
|
||||
errResponse := minio.ToErrorResponse(e)
|
||||
if errResponse.Code == "AccessDenied" {
|
||||
return nil, errors.Wrap(e, "AccessDenied")
|
||||
}
|
||||
if errResponse.Code == "NoSuchBucket" {
|
||||
return nil, errors.Wrap(e, "NoSuchBucket")
|
||||
}
|
||||
if errResponse.Code == "InvalidBucketName" {
|
||||
return nil, errors.Wrap(e, "InvalidBucketName")
|
||||
}
|
||||
if errResponse.Code == "NoSuchKey" {
|
||||
return nil, errors.Wrap(e, "NoSuchKey")
|
||||
}
|
||||
return nil, e
|
||||
}
|
||||
return &objectStat, nil
|
||||
}
|
||||
|
||||
func (s3a *S3Archive) Init(rawConfig json.RawMessage) (uint64, error) {
|
||||
var config S3ArchiveConfig
|
||||
var err error
|
||||
if err = json.Unmarshal(rawConfig, &config); err != nil {
|
||||
log.Warnf("Init() > Unmarshal error: %#v", err)
|
||||
return 0, err
|
||||
}
|
||||
|
||||
fmt.Printf("Endpoint: %s Bucket: %s\n", config.Endpoint, config.Bucket)
|
||||
|
||||
s3a.client, err = minio.New(config.Endpoint, &minio.Options{
|
||||
Creds: credentials.NewStaticV4(config.AccessKeyID, config.SecretAccessKey, ""),
|
||||
Secure: config.UseSSL,
|
||||
})
|
||||
if err != nil {
|
||||
err = fmt.Errorf("Init() : Initialize minio client failed")
|
||||
return 0, err
|
||||
}
|
||||
|
||||
s3a.bucket = config.Bucket
|
||||
|
||||
found, err := s3a.client.BucketExists(context.Background(), s3a.bucket)
|
||||
if err != nil {
|
||||
err = fmt.Errorf("Init() : %v", err)
|
||||
return 0, err
|
||||
}
|
||||
|
||||
if found {
|
||||
log.Infof("Bucket found.")
|
||||
} else {
|
||||
log.Infof("Bucket not found.")
|
||||
}
|
||||
|
||||
r, err := s3a.client.GetObject(context.Background(),
|
||||
s3a.bucket, "version.txt", minio.GetObjectOptions{})
|
||||
if err != nil {
|
||||
err = fmt.Errorf("Init() : Get version object failed")
|
||||
return 0, err
|
||||
}
|
||||
defer r.Close()
|
||||
|
||||
b, err := io.ReadAll(r)
|
||||
if err != nil {
|
||||
log.Errorf("Init() : %v", err)
|
||||
return 0, err
|
||||
}
|
||||
version, err := strconv.ParseUint(strings.TrimSuffix(string(b), "\n"), 10, 64)
|
||||
if err != nil {
|
||||
log.Errorf("Init() : %v", err)
|
||||
return 0, err
|
||||
}
|
||||
|
||||
if version != Version {
|
||||
return 0, fmt.Errorf("unsupported version %d, need %d", version, Version)
|
||||
}
|
||||
|
||||
for object := range s3a.client.ListObjects(
|
||||
context.Background(),
|
||||
s3a.bucket, minio.ListObjectsOptions{
|
||||
Recursive: false,
|
||||
}) {
|
||||
|
||||
if object.Err != nil {
|
||||
log.Errorf("listObject: %v", object.Err)
|
||||
}
|
||||
if strings.HasSuffix(object.Key, "/") {
|
||||
s3a.clusters = append(s3a.clusters, strings.TrimSuffix(object.Key, "/"))
|
||||
}
|
||||
}
|
||||
|
||||
return version, err
|
||||
}
|
||||
|
||||
func (s3a *S3Archive) Info() {
|
||||
fmt.Printf("Job archive %s\n", s3a.bucket)
|
||||
var clusters []string
|
||||
|
||||
for object := range s3a.client.ListObjects(
|
||||
context.Background(),
|
||||
s3a.bucket, minio.ListObjectsOptions{
|
||||
Recursive: false,
|
||||
}) {
|
||||
|
||||
if object.Err != nil {
|
||||
log.Errorf("listObject: %v", object.Err)
|
||||
}
|
||||
if strings.HasSuffix(object.Key, "/") {
|
||||
clusters = append(clusters, object.Key)
|
||||
}
|
||||
}
|
||||
ci := make(map[string]*clusterInfo)
|
||||
for _, cluster := range clusters {
|
||||
ci[cluster] = &clusterInfo{dateFirst: time.Now().Unix()}
|
||||
for d := range s3a.client.ListObjects(
|
||||
context.Background(),
|
||||
s3a.bucket, minio.ListObjectsOptions{
|
||||
Recursive: true,
|
||||
Prefix: cluster,
|
||||
}) {
|
||||
log.Errorf("%s", d.Key)
|
||||
ci[cluster].diskSize += (float64(d.Size) * 1e-6)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// func (s3a *S3Archive) Exists(job *schema.Job) bool {
|
||||
// return true
|
||||
// }
|
||||
|
||||
func (s3a *S3Archive) LoadJobMeta(job *schema.Job) (*schema.JobMeta, error) {
|
||||
filename := getPath(job, "/", "meta.json")
|
||||
log.Infof("Init() : %s", filename)
|
||||
|
||||
r, err := s3a.client.GetObject(context.Background(),
|
||||
s3a.bucket, filename, minio.GetObjectOptions{})
|
||||
if err != nil {
|
||||
err = fmt.Errorf("Init() : Get version object failed")
|
||||
return nil, err
|
||||
}
|
||||
defer r.Close()
|
||||
|
||||
b, err := io.ReadAll(r)
|
||||
if err != nil {
|
||||
log.Errorf("Init() : %v", err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return loadJobMeta(b)
|
||||
}
|
||||
|
||||
func (s3a *S3Archive) LoadJobData(job *schema.Job) (schema.JobData, error) {
|
||||
isCompressed := true
|
||||
key := getPath(job, "./", "data.json.gz")
|
||||
|
||||
_, err := s3a.stat(key)
|
||||
if err != nil {
|
||||
if err.Error() == "NoSuchKey" {
|
||||
key = getPath(job, "./", "data.json")
|
||||
isCompressed = false
|
||||
}
|
||||
}
|
||||
|
||||
r, err := s3a.client.GetObject(context.Background(),
|
||||
s3a.bucket, key, minio.GetObjectOptions{})
|
||||
if err != nil {
|
||||
err = fmt.Errorf("Init() : Get version object failed")
|
||||
return nil, err
|
||||
}
|
||||
defer r.Close()
|
||||
|
||||
return loadJobData(r, key, isCompressed)
|
||||
}
|
||||
|
||||
func (s3a *S3Archive) LoadClusterCfg(name string) (*schema.Cluster, error) {
|
||||
key := filepath.Join("./", name, "cluster.json")
|
||||
|
||||
r, err := s3a.client.GetObject(context.Background(),
|
||||
s3a.bucket, key, minio.GetObjectOptions{})
|
||||
if err != nil {
|
||||
err = fmt.Errorf("Init() : Get version object failed")
|
||||
return nil, err
|
||||
}
|
||||
defer r.Close()
|
||||
|
||||
return DecodeCluster(r)
|
||||
}
|
||||
|
||||
func (s3a *S3Archive) ImportJob(
|
||||
jobMeta *schema.JobMeta,
|
||||
jobData *schema.JobData,
|
||||
) error {
|
||||
job := schema.Job{
|
||||
BaseJob: jobMeta.BaseJob,
|
||||
StartTime: time.Unix(jobMeta.StartTime, 0),
|
||||
StartTimeUnix: jobMeta.StartTime,
|
||||
}
|
||||
|
||||
r, w := io.Pipe()
|
||||
|
||||
if err := EncodeJobMeta(w, jobMeta); err != nil {
|
||||
log.Error("Error while encoding job metadata to meta.json file")
|
||||
return err
|
||||
}
|
||||
|
||||
key := getPath(&job, "./", "meta.json")
|
||||
s3a.client.PutObject(context.Background(),
|
||||
s3a.bucket, key, r,
|
||||
int64(unsafe.Sizeof(job)), minio.PutObjectOptions{})
|
||||
|
||||
if err := w.Close(); err != nil {
|
||||
log.Warn("Error while closing meta.json file")
|
||||
return err
|
||||
}
|
||||
|
||||
//
|
||||
// f, err = os.Create(path.Join(dir, "data.json"))
|
||||
// if err != nil {
|
||||
// log.Error("Error while creating filepath for data.json")
|
||||
// return err
|
||||
// }
|
||||
// if err := EncodeJobData(f, jobData); err != nil {
|
||||
// log.Error("Error while encoding job metricdata to data.json file")
|
||||
// return err
|
||||
// }
|
||||
// if err := f.Close(); err != nil {
|
||||
// log.Warn("Error while closing data.json file")
|
||||
// }
|
||||
// return err
|
||||
//
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s3a *S3Archive) StoreJobMeta(jobMeta *schema.JobMeta) error {
|
||||
job := schema.Job{
|
||||
BaseJob: jobMeta.BaseJob,
|
||||
StartTime: time.Unix(jobMeta.StartTime, 0),
|
||||
StartTimeUnix: jobMeta.StartTime,
|
||||
}
|
||||
|
||||
r, w := io.Pipe()
|
||||
|
||||
if err := EncodeJobMeta(w, jobMeta); err != nil {
|
||||
log.Error("Error while encoding job metadata to meta.json file")
|
||||
return err
|
||||
}
|
||||
|
||||
key := getPath(&job, "./", "meta.json")
|
||||
s3a.client.PutObject(context.Background(),
|
||||
s3a.bucket, key, r,
|
||||
int64(unsafe.Sizeof(job)), minio.PutObjectOptions{})
|
||||
|
||||
if err := w.Close(); err != nil {
|
||||
log.Warn("Error while closing meta.json file")
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s3a *S3Archive) GetClusters() []string {
|
||||
return s3a.clusters
|
||||
}
|
||||
|
||||
//
|
||||
// func (s3a *S3Archive) CleanUp(jobs []*schema.Job)
|
||||
//
|
||||
// func (s3a *S3Archive) Move(jobs []*schema.Job, path string)
|
||||
//
|
||||
// func (s3a *S3Archive) Clean(before int64, after int64)
|
||||
//
|
||||
// func (s3a *S3Archive) Compress(jobs []*schema.Job)
|
||||
//
|
||||
// func (s3a *S3Archive) CompressLast(starttime int64) int64
|
||||
//
|
||||
// func (s3a *S3Archive) Iter(loadMetricData bool) <-chan JobContainer
|
||||
|
102
pkg/archive/s3Backend_test.go
Normal file
102
pkg/archive/s3Backend_test.go
Normal file
@@ -0,0 +1,102 @@
|
||||
// Copyright (C) 2023 NHR@FAU, University Erlangen-Nuremberg.
|
||||
// All rights reserved.
|
||||
// Use of this source code is governed by a MIT-style
|
||||
// license that can be found in the LICENSE file.
|
||||
package archive
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/ClusterCockpit/cc-backend/pkg/schema"
|
||||
)
|
||||
|
||||
func TestS3Init(t *testing.T) {
|
||||
var s3a S3Archive
|
||||
version, err := s3a.Init(json.RawMessage("{\"endpoint\":\"192.168.1.10:9100\",\"accessKeyID\":\"uACSaCN2Chiotpnr4bBS\",\"secretAccessKey\":\"MkEbBsFvMii1K5GreUriTJZxH359B1n28Au9Kaml\",\"bucket\":\"cc-archive\",\"useSSL\":false}"))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if s3a.bucket != "cc-archive" {
|
||||
t.Errorf("S3 bucket \ngot: %s \nwant: cc-archive", s3a.bucket)
|
||||
}
|
||||
if version != 1 {
|
||||
t.Errorf("S3 archive version \ngot: %d \nwant: 1", version)
|
||||
t.Fail()
|
||||
}
|
||||
if len(s3a.clusters) != 2 || s3a.clusters[0] != "alex" {
|
||||
t.Fail()
|
||||
}
|
||||
}
|
||||
|
||||
func TestS3LoadJobMeta(t *testing.T) {
|
||||
var s3a S3Archive
|
||||
_, err := s3a.Init(json.RawMessage("{\"endpoint\":\"192.168.1.10:9100\",\"accessKeyID\":\"uACSaCN2Chiotpnr4bBS\",\"secretAccessKey\":\"MkEbBsFvMii1K5GreUriTJZxH359B1n28Au9Kaml\",\"bucket\":\"cc-archive\",\"useSSL\":false}"))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
jobIn := schema.Job{BaseJob: schema.JobDefaults}
|
||||
jobIn.StartTime = time.Unix(1675954353, 0)
|
||||
jobIn.JobID = 398764
|
||||
jobIn.Cluster = "fritz"
|
||||
|
||||
job, err := s3a.LoadJobMeta(&jobIn)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if job.JobID != 398764 {
|
||||
t.Fail()
|
||||
}
|
||||
if int(job.NumNodes) != len(job.Resources) {
|
||||
t.Fail()
|
||||
}
|
||||
if job.StartTime != 1675954353 {
|
||||
t.Fail()
|
||||
}
|
||||
}
|
||||
|
||||
func TestS3LoadJobData(t *testing.T) {
|
||||
var s3a S3Archive
|
||||
_, err := s3a.Init(json.RawMessage("{\"endpoint\":\"192.168.1.10:9100\",\"accessKeyID\":\"uACSaCN2Chiotpnr4bBS\",\"secretAccessKey\":\"MkEbBsFvMii1K5GreUriTJZxH359B1n28Au9Kaml\",\"bucket\":\"cc-archive\",\"useSSL\":false}"))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
jobIn := schema.Job{BaseJob: schema.JobDefaults}
|
||||
jobIn.StartTime = time.Unix(1675954353, 0)
|
||||
jobIn.JobID = 398764
|
||||
jobIn.Cluster = "fritz"
|
||||
|
||||
data, err := s3a.LoadJobData(&jobIn)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
for _, scopes := range data {
|
||||
// fmt.Printf("Metric name: %s\n", name)
|
||||
|
||||
if _, exists := scopes[schema.MetricScopeNode]; !exists {
|
||||
t.Fail()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestS3LoadCluster(t *testing.T) {
|
||||
var s3a S3Archive
|
||||
_, err := s3a.Init(json.RawMessage("{\"endpoint\":\"192.168.1.10:9100\",\"accessKeyID\":\"uACSaCN2Chiotpnr4bBS\",\"secretAccessKey\":\"MkEbBsFvMii1K5GreUriTJZxH359B1n28Au9Kaml\",\"bucket\":\"cc-archive\",\"useSSL\":false}"))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
cfg, err := s3a.LoadClusterCfg("fritz")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if cfg.SubClusters[0].CoresPerSocket != 36 {
|
||||
t.Fail()
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user