mirror of
https://github.com/ClusterCockpit/cc-backend
synced 2025-07-19 03:11:40 +02:00
Merge branch 'master' into 34-52-rework-searchbar
This commit is contained in:
@@ -6,7 +6,6 @@ package api
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"context"
|
||||
"database/sql"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
@@ -23,7 +22,6 @@ import (
|
||||
"github.com/ClusterCockpit/cc-backend/internal/auth"
|
||||
"github.com/ClusterCockpit/cc-backend/internal/graph"
|
||||
"github.com/ClusterCockpit/cc-backend/internal/graph/model"
|
||||
"github.com/ClusterCockpit/cc-backend/internal/metricdata"
|
||||
"github.com/ClusterCockpit/cc-backend/internal/repository"
|
||||
"github.com/ClusterCockpit/cc-backend/pkg/archive"
|
||||
"github.com/ClusterCockpit/cc-backend/pkg/log"
|
||||
@@ -56,7 +54,6 @@ type RestApi struct {
|
||||
Resolver *graph.Resolver
|
||||
Authentication *auth.Authentication
|
||||
MachineStateDir string
|
||||
OngoingArchivings sync.WaitGroup
|
||||
RepositoryMutex sync.Mutex
|
||||
}
|
||||
|
||||
@@ -721,34 +718,8 @@ func (api *RestApi) checkAndHandleStopJob(rw http.ResponseWriter, job *schema.Jo
|
||||
return
|
||||
}
|
||||
|
||||
// We need to start a new goroutine as this functions needs to return
|
||||
// for the response to be flushed to the client.
|
||||
api.OngoingArchivings.Add(1) // So that a shutdown does not interrupt this goroutine.
|
||||
go func() {
|
||||
defer api.OngoingArchivings.Done()
|
||||
|
||||
if _, err := api.JobRepository.FetchMetadata(job); err != nil {
|
||||
log.Errorf("archiving job (dbid: %d) failed: %s", job.ID, err.Error())
|
||||
api.JobRepository.UpdateMonitoringStatus(job.ID, schema.MonitoringStatusArchivingFailed)
|
||||
return
|
||||
}
|
||||
|
||||
// metricdata.ArchiveJob will fetch all the data from a MetricDataRepository and create meta.json/data.json files
|
||||
jobMeta, err := metricdata.ArchiveJob(job, context.Background())
|
||||
if err != nil {
|
||||
log.Errorf("archiving job (dbid: %d) failed: %s", job.ID, err.Error())
|
||||
api.JobRepository.UpdateMonitoringStatus(job.ID, schema.MonitoringStatusArchivingFailed)
|
||||
return
|
||||
}
|
||||
|
||||
// Update the jobs database entry one last time:
|
||||
if err := api.JobRepository.Archive(job.ID, schema.MonitoringStatusArchivingSuccessful, jobMeta.Statistics); err != nil {
|
||||
log.Errorf("archiving job (dbid: %d) failed: %s", job.ID, err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
log.Printf("archiving job (dbid: %d) successful", job.ID)
|
||||
}()
|
||||
// Trigger async archiving
|
||||
api.JobRepository.TriggerArchiving(job)
|
||||
}
|
||||
|
||||
// func (api *RestApi) importJob(rw http.ResponseWriter, r *http.Request) {
|
||||
|
@@ -55,6 +55,8 @@ func Init(disableArchive bool) error {
|
||||
mdr = &CCMetricStore{}
|
||||
case "influxdb":
|
||||
mdr = &InfluxDBv2DataRepository{}
|
||||
case "prometheus":
|
||||
mdr = &PrometheusDataRepository{}
|
||||
case "test":
|
||||
mdr = &TestMetricDataRepository{}
|
||||
default:
|
||||
@@ -156,6 +158,7 @@ func LoadData(job *schema.Job,
|
||||
}
|
||||
|
||||
prepareJobData(job, jd, scopes)
|
||||
|
||||
return jd, ttl, size
|
||||
})
|
||||
|
||||
|
449
internal/metricdata/prometheus.go
Normal file
449
internal/metricdata/prometheus.go
Normal file
@@ -0,0 +1,449 @@
|
||||
// Copyright (C) 2022 DKRZ
|
||||
// All rights reserved.
|
||||
// Use of this source code is governed by a MIT-style
|
||||
// license that can be found in the LICENSE file.
|
||||
package metricdata
|
||||
|
||||
import (
|
||||
"os"
|
||||
"errors"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"strings"
|
||||
"text/template"
|
||||
"bytes"
|
||||
"net/http"
|
||||
"time"
|
||||
"math"
|
||||
"sort"
|
||||
"regexp"
|
||||
"sync"
|
||||
|
||||
"github.com/ClusterCockpit/cc-backend/pkg/archive"
|
||||
"github.com/ClusterCockpit/cc-backend/pkg/schema"
|
||||
"github.com/ClusterCockpit/cc-backend/pkg/log"
|
||||
promapi "github.com/prometheus/client_golang/api"
|
||||
promv1 "github.com/prometheus/client_golang/api/prometheus/v1"
|
||||
promcfg "github.com/prometheus/common/config"
|
||||
promm "github.com/prometheus/common/model"
|
||||
)
|
||||
|
||||
type PrometheusDataRepositoryConfig struct {
|
||||
Url string `json:"url"`
|
||||
Username string `json:"username,omitempty"`
|
||||
Suffix string `json:"suffix,omitempty"`
|
||||
Templates map[string]string `json:"query-templates"`
|
||||
}
|
||||
|
||||
type PrometheusDataRepository struct {
|
||||
client promapi.Client
|
||||
queryClient promv1.API
|
||||
suffix string
|
||||
templates map[string]*template.Template
|
||||
}
|
||||
|
||||
type PromQLArgs struct {
|
||||
Nodes string
|
||||
}
|
||||
|
||||
type Trie map[rune]Trie
|
||||
|
||||
var logOnce sync.Once
|
||||
|
||||
func contains(s []schema.MetricScope, str schema.MetricScope) bool {
|
||||
for _, v := range s {
|
||||
if v == str {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
|
||||
func MinMaxMean(data []schema.Float) (float64, float64, float64) {
|
||||
if len(data) == 0 {
|
||||
return 0.0, 0.0, 0.0
|
||||
}
|
||||
min := math.MaxFloat64
|
||||
max := -math.MaxFloat64
|
||||
var sum float64
|
||||
var n float64
|
||||
for _, val := range data {
|
||||
if val.IsNaN() {
|
||||
continue
|
||||
}
|
||||
sum += float64(val)
|
||||
n += 1
|
||||
if float64(val) > max {max = float64(val)}
|
||||
if float64(val) < min {min = float64(val)}
|
||||
}
|
||||
return min, max, sum / n
|
||||
}
|
||||
|
||||
|
||||
// Rewritten from
|
||||
// https://github.com/ermanh/trieregex/blob/master/trieregex/trieregex.py
|
||||
func nodeRegex(nodes []string) string {
|
||||
root := Trie{}
|
||||
// add runes of each compute node to trie
|
||||
for _, node := range nodes {
|
||||
_trie := root
|
||||
for _, c := range node {
|
||||
if _, ok := _trie[c]; !ok {_trie[c] = Trie{}}
|
||||
_trie = _trie[c]
|
||||
}
|
||||
_trie['*'] = Trie{}
|
||||
}
|
||||
// recursively build regex from rune trie
|
||||
var trieRegex func(trie Trie, reset bool) string
|
||||
trieRegex = func(trie Trie, reset bool) string {
|
||||
if reset == true {
|
||||
trie = root
|
||||
}
|
||||
if len(trie) == 0 {
|
||||
return ""
|
||||
}
|
||||
if len(trie) == 1 {
|
||||
for key, _trie := range trie {
|
||||
if key == '*' { return "" }
|
||||
return regexp.QuoteMeta(string(key)) + trieRegex(_trie, false)
|
||||
}
|
||||
} else {
|
||||
sequences := []string{}
|
||||
for key, _trie := range trie {
|
||||
if key != '*' {
|
||||
sequences = append(sequences, regexp.QuoteMeta(string(key)) + trieRegex(_trie, false))
|
||||
}
|
||||
}
|
||||
sort.Slice(sequences, func(i, j int) bool {
|
||||
return (-len(sequences[i]) < -len(sequences[j])) || (sequences[i] < sequences[j])
|
||||
})
|
||||
var result string
|
||||
// single edge from this tree node
|
||||
if len(sequences) == 1 {
|
||||
result = sequences[0]
|
||||
if len(result) > 1 {
|
||||
result = "(?:" + result + ")"
|
||||
}
|
||||
// multiple edges, each length 1
|
||||
} else if s := strings.Join(sequences, ""); len(s) == len(sequences) {
|
||||
// char or numeric range
|
||||
if len(s)-1 == int(s[len(s)-1]) - int(s[0]) {
|
||||
result = fmt.Sprintf("[%c-%c]", s[0], s[len(s)-1])
|
||||
// char or numeric set
|
||||
} else {
|
||||
result = "[" + s + "]"
|
||||
}
|
||||
// multiple edges of different lengths
|
||||
} else {
|
||||
result = "(?:" + strings.Join(sequences, "|") + ")"
|
||||
}
|
||||
if _, ok := trie['*']; ok { result += "?"}
|
||||
return result
|
||||
}
|
||||
return ""
|
||||
}
|
||||
return trieRegex(root, true)
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
func (pdb *PrometheusDataRepository) Init(rawConfig json.RawMessage) error {
|
||||
var config PrometheusDataRepositoryConfig
|
||||
// parse config
|
||||
if err := json.Unmarshal(rawConfig, &config); err != nil {
|
||||
return err
|
||||
}
|
||||
// support basic authentication
|
||||
var rt http.RoundTripper = nil
|
||||
if prom_pw := os.Getenv("PROMETHEUS_PASSWORD"); prom_pw != "" && config.Username != "" {
|
||||
prom_pw := promcfg.Secret(prom_pw)
|
||||
rt = promcfg.NewBasicAuthRoundTripper(config.Username, prom_pw, "", promapi.DefaultRoundTripper)
|
||||
} else {
|
||||
if config.Username != "" {
|
||||
return errors.New("Prometheus username provided, but PROMETHEUS_PASSWORD not set.")
|
||||
}
|
||||
}
|
||||
// init client
|
||||
client, err := promapi.NewClient(promapi.Config{
|
||||
Address: config.Url,
|
||||
RoundTripper: rt,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// init query client
|
||||
pdb.client = client
|
||||
pdb.queryClient = promv1.NewAPI(pdb.client)
|
||||
// site config
|
||||
pdb.suffix = config.Suffix
|
||||
// init query templates
|
||||
pdb.templates = make(map[string]*template.Template)
|
||||
for metric, templ := range config.Templates {
|
||||
pdb.templates[metric], err = template.New(metric).Parse(templ)
|
||||
if err == nil {
|
||||
log.Debugf("Added PromQL template for %s: %s", metric, templ)
|
||||
} else {
|
||||
log.Errorf("Failed to parse PromQL template %s for metric %s", templ, metric)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
// TODO: respect scope argument
|
||||
func (pdb *PrometheusDataRepository) FormatQuery(
|
||||
metric string,
|
||||
scope schema.MetricScope,
|
||||
nodes []string,
|
||||
cluster string) (string, error) {
|
||||
|
||||
args := PromQLArgs{}
|
||||
if len(nodes) > 0 {
|
||||
args.Nodes = fmt.Sprintf("(%s)%s", nodeRegex(nodes), pdb.suffix)
|
||||
} else {
|
||||
args.Nodes = fmt.Sprintf(".*%s", pdb.suffix)
|
||||
}
|
||||
|
||||
buf := &bytes.Buffer{}
|
||||
if templ, ok := pdb.templates[metric]; ok {
|
||||
err := templ.Execute(buf, args)
|
||||
if err != nil {
|
||||
return "", errors.New(fmt.Sprintf("Error compiling template %v", templ))
|
||||
} else {
|
||||
query := buf.String()
|
||||
log.Debugf(fmt.Sprintf("PromQL: %s", query))
|
||||
return query, nil
|
||||
}
|
||||
} else {
|
||||
return "", errors.New(fmt.Sprintf("No PromQL for metric %s configured.", metric))
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
// Convert PromAPI row to CC schema.Series
|
||||
func (pdb *PrometheusDataRepository) RowToSeries(
|
||||
from time.Time,
|
||||
step int64,
|
||||
steps int64,
|
||||
row *promm.SampleStream) (schema.Series) {
|
||||
ts := from.Unix()
|
||||
hostname := strings.TrimSuffix(string(row.Metric["exported_instance"]), pdb.suffix)
|
||||
// init array of expected length with NaN
|
||||
values := make([]schema.Float, steps + 1)
|
||||
for i, _ := range values {
|
||||
values[i] = schema.NaN
|
||||
}
|
||||
// copy recorded values from prom sample pair
|
||||
for _, v := range row.Values {
|
||||
idx := (v.Timestamp.Unix() - ts) / step
|
||||
values[idx] = schema.Float(v.Value)
|
||||
}
|
||||
min, max, mean := MinMaxMean(values)
|
||||
// output struct
|
||||
return schema.Series{
|
||||
Hostname: hostname,
|
||||
Data: values,
|
||||
Statistics: &schema.MetricStatistics{
|
||||
Avg: mean,
|
||||
Min: min,
|
||||
Max: max,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
func (pdb *PrometheusDataRepository) LoadData(
|
||||
job *schema.Job,
|
||||
metrics []string,
|
||||
scopes []schema.MetricScope,
|
||||
ctx context.Context) (schema.JobData, error) {
|
||||
|
||||
// TODO respect requested scope
|
||||
if len(scopes) == 0 || !contains(scopes, schema.MetricScopeNode){
|
||||
scopes = append(scopes, schema.MetricScopeNode)
|
||||
}
|
||||
|
||||
jobData := make(schema.JobData)
|
||||
// parse job specs
|
||||
nodes := make([]string, len(job.Resources))
|
||||
for i, resource := range job.Resources {
|
||||
nodes[i] = resource.Hostname
|
||||
}
|
||||
from := job.StartTime
|
||||
to := job.StartTime.Add(time.Duration(job.Duration) * time.Second)
|
||||
|
||||
for _, scope := range scopes {
|
||||
if scope != schema.MetricScopeNode {
|
||||
logOnce.Do(func(){log.Infof(fmt.Sprintf("Note: Scope '%s' requested, but not yet supported: Will return 'node' scope only.", scope))})
|
||||
continue
|
||||
}
|
||||
|
||||
for _, metric := range metrics {
|
||||
metricConfig := archive.GetMetricConfig(job.Cluster, metric)
|
||||
if metricConfig == nil {
|
||||
log.Errorf(fmt.Sprintf("Error in LoadData: Metric %s for cluster %s not configured",
|
||||
metric, job.Cluster))
|
||||
return nil, errors.New("Prometheus querry error")
|
||||
}
|
||||
query, err := pdb.FormatQuery(metric, scope, nodes, job.Cluster)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// ranged query over all job nodes
|
||||
r := promv1.Range{
|
||||
Start: from,
|
||||
End: to,
|
||||
Step: time.Duration(metricConfig.Timestep * 1e9),
|
||||
}
|
||||
result, warnings, err := pdb.queryClient.QueryRange(ctx, query, r)
|
||||
|
||||
if err != nil {
|
||||
log.Errorf(fmt.Sprintf("Prometheus query error in LoadData: %v\nQuery: %s", err, query))
|
||||
return nil, errors.New("Prometheus querry error")
|
||||
}
|
||||
if len(warnings) > 0 {
|
||||
log.Warnf(fmt.Sprintf("Warnings: %v\n", warnings))
|
||||
}
|
||||
|
||||
// init data structures
|
||||
if _, ok := jobData[metric]; !ok {
|
||||
jobData[metric] = make(map[schema.MetricScope]*schema.JobMetric)
|
||||
}
|
||||
jobMetric, ok := jobData[metric][scope]
|
||||
if !ok {
|
||||
jobMetric = &schema.JobMetric{
|
||||
Unit: metricConfig.Unit,
|
||||
Scope: scope,
|
||||
Timestep: metricConfig.Timestep,
|
||||
Series: make([]schema.Series, 0),
|
||||
}
|
||||
jobData[metric][scope] = jobMetric
|
||||
}
|
||||
step := int64(metricConfig.Timestep)
|
||||
steps := int64(to.Sub(from).Seconds()) / step
|
||||
// iter rows of host, metric, values
|
||||
for _, row := range result.(promm.Matrix) {
|
||||
jobMetric.Series = append(jobMetric.Series,
|
||||
pdb.RowToSeries(from, step, steps, row))
|
||||
}
|
||||
// sort by hostname to get uniform coloring
|
||||
sort.Slice(jobMetric.Series, func(i, j int) bool {
|
||||
return (jobMetric.Series[i].Hostname < jobMetric.Series[j].Hostname)
|
||||
})
|
||||
}
|
||||
}
|
||||
return jobData, nil
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
// TODO change implementation to precomputed/cached stats
|
||||
func (pdb *PrometheusDataRepository) LoadStats(
|
||||
job *schema.Job,
|
||||
metrics []string,
|
||||
ctx context.Context) (map[string]map[string]schema.MetricStatistics, error) {
|
||||
|
||||
// map of metrics of nodes of stats
|
||||
stats := map[string]map[string]schema.MetricStatistics{}
|
||||
|
||||
data, err := pdb.LoadData(job, metrics, []schema.MetricScope{schema.MetricScopeNode}, ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for metric, metricData := range data {
|
||||
stats[metric] = make(map[string]schema.MetricStatistics)
|
||||
for _, series := range metricData[schema.MetricScopeNode].Series {
|
||||
stats[metric][series.Hostname] = *series.Statistics
|
||||
}
|
||||
}
|
||||
|
||||
return stats, nil
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
func (pdb *PrometheusDataRepository) LoadNodeData(
|
||||
cluster string,
|
||||
metrics, nodes []string,
|
||||
scopes []schema.MetricScope,
|
||||
from, to time.Time,
|
||||
ctx context.Context) (map[string]map[string][]*schema.JobMetric, error) {
|
||||
t0 := time.Now()
|
||||
// Map of hosts of metrics of value slices
|
||||
data := make(map[string]map[string][]*schema.JobMetric)
|
||||
// query db for each metric
|
||||
// TODO: scopes seems to be always empty
|
||||
if len(scopes) == 0 || !contains(scopes, schema.MetricScopeNode) {
|
||||
scopes = append(scopes, schema.MetricScopeNode)
|
||||
}
|
||||
for _, scope := range scopes {
|
||||
if scope != schema.MetricScopeNode {
|
||||
logOnce.Do(func(){log.Infof(fmt.Sprintf("Note: Scope '%s' requested, but not yet supported: Will return 'node' scope only.", scope))})
|
||||
continue
|
||||
}
|
||||
for _, metric := range metrics {
|
||||
metricConfig := archive.GetMetricConfig(cluster, metric)
|
||||
if metricConfig == nil {
|
||||
log.Errorf(fmt.Sprintf("Error in LoadNodeData: Metric %s for cluster %s not configured",
|
||||
metric, cluster))
|
||||
return nil, errors.New("Prometheus querry error")
|
||||
}
|
||||
query, err := pdb.FormatQuery(metric, scope, nodes, cluster)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// ranged query over all nodes
|
||||
r := promv1.Range{
|
||||
Start: from,
|
||||
End: to,
|
||||
Step: time.Duration(metricConfig.Timestep * 1e9),
|
||||
}
|
||||
result, warnings, err := pdb.queryClient.QueryRange(ctx, query, r)
|
||||
|
||||
if err != nil {
|
||||
log.Errorf(fmt.Sprintf("Prometheus query error in LoadNodeData: %v\n", err))
|
||||
return nil, errors.New("Prometheus querry error")
|
||||
}
|
||||
if len(warnings) > 0 {
|
||||
log.Warnf(fmt.Sprintf("Warnings: %v\n", warnings))
|
||||
}
|
||||
|
||||
step := int64(metricConfig.Timestep)
|
||||
steps := int64(to.Sub(from).Seconds()) / step
|
||||
|
||||
// iter rows of host, metric, values
|
||||
for _, row := range result.(promm.Matrix) {
|
||||
hostname := strings.TrimSuffix(string(row.Metric["exported_instance"]), pdb.suffix)
|
||||
hostdata, ok := data[hostname]
|
||||
if !ok {
|
||||
hostdata = make(map[string][]*schema.JobMetric)
|
||||
data[hostname] = hostdata
|
||||
}
|
||||
// output per host and metric
|
||||
hostdata[metric] = append(hostdata[metric], &schema.JobMetric{
|
||||
Unit: metricConfig.Unit,
|
||||
Scope: scope,
|
||||
Timestep: metricConfig.Timestep,
|
||||
Series: []schema.Series{pdb.RowToSeries(from, step, steps, row)},
|
||||
},
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
t1 := time.Since(t0)
|
||||
log.Debugf(fmt.Sprintf("LoadNodeData of %v nodes took %s", len(data), t1))
|
||||
return data, nil
|
||||
}
|
@@ -16,6 +16,7 @@ import (
|
||||
|
||||
"github.com/ClusterCockpit/cc-backend/internal/auth"
|
||||
"github.com/ClusterCockpit/cc-backend/internal/graph/model"
|
||||
"github.com/ClusterCockpit/cc-backend/internal/metricdata"
|
||||
"github.com/ClusterCockpit/cc-backend/pkg/log"
|
||||
"github.com/ClusterCockpit/cc-backend/pkg/lrucache"
|
||||
"github.com/ClusterCockpit/cc-backend/pkg/schema"
|
||||
@@ -33,6 +34,9 @@ type JobRepository struct {
|
||||
|
||||
stmtCache *sq.StmtCache
|
||||
cache *lrucache.Cache
|
||||
|
||||
archiveChannel chan *schema.Job
|
||||
archivePending sync.WaitGroup
|
||||
}
|
||||
|
||||
func GetJobRepository() *JobRepository {
|
||||
@@ -43,7 +47,10 @@ func GetJobRepository() *JobRepository {
|
||||
DB: db.DB,
|
||||
stmtCache: sq.NewStmtCache(db.DB),
|
||||
cache: lrucache.New(1024 * 1024),
|
||||
archiveChannel: make(chan *schema.Job, 128),
|
||||
}
|
||||
// start archiving worker
|
||||
go jobRepoInstance.archivingWorker()
|
||||
})
|
||||
|
||||
return jobRepoInstance
|
||||
@@ -361,7 +368,7 @@ func (r *JobRepository) UpdateMonitoringStatus(job int64, monitoringStatus int32
|
||||
}
|
||||
|
||||
// Stop updates the job with the database id jobId using the provided arguments.
|
||||
func (r *JobRepository) Archive(
|
||||
func (r *JobRepository) MarkArchived(
|
||||
jobId int64,
|
||||
monitoringStatus int32,
|
||||
metricStats map[string]schema.JobStatistics) error {
|
||||
@@ -393,11 +400,60 @@ func (r *JobRepository) Archive(
|
||||
return nil
|
||||
}
|
||||
|
||||
// Archiving worker thread
|
||||
func (r *JobRepository) archivingWorker(){
|
||||
for {
|
||||
select {
|
||||
case job, ok := <- r.archiveChannel:
|
||||
if !ok {
|
||||
break
|
||||
}
|
||||
// not using meta data, called to load JobMeta into Cache?
|
||||
// will fail if job meta not in repository
|
||||
if _, err := r.FetchMetadata(job); err != nil {
|
||||
log.Errorf("archiving job (dbid: %d) failed: %s", job.ID, err.Error())
|
||||
r.UpdateMonitoringStatus(job.ID, schema.MonitoringStatusArchivingFailed)
|
||||
continue
|
||||
}
|
||||
|
||||
// metricdata.ArchiveJob will fetch all the data from a MetricDataRepository and push into configured archive backend
|
||||
// TODO: Maybe use context with cancel/timeout here
|
||||
jobMeta, err := metricdata.ArchiveJob(job, context.Background())
|
||||
if err != nil {
|
||||
log.Errorf("archiving job (dbid: %d) failed: %s", job.ID, err.Error())
|
||||
r.UpdateMonitoringStatus(job.ID, schema.MonitoringStatusArchivingFailed)
|
||||
continue
|
||||
}
|
||||
|
||||
// Update the jobs database entry one last time:
|
||||
if err := r.MarkArchived(job.ID, schema.MonitoringStatusArchivingSuccessful, jobMeta.Statistics); err != nil {
|
||||
log.Errorf("archiving job (dbid: %d) failed: %s", job.ID, err.Error())
|
||||
continue
|
||||
}
|
||||
|
||||
log.Printf("archiving job (dbid: %d) successful", job.ID)
|
||||
r.archivePending.Done()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Trigger async archiving
|
||||
func (r *JobRepository) TriggerArchiving(job *schema.Job){
|
||||
r.archivePending.Add(1)
|
||||
r.archiveChannel <- job
|
||||
}
|
||||
|
||||
// Wait for background thread to finish pending archiving operations
|
||||
func (r *JobRepository) WaitForArchiving(){
|
||||
// close channel and wait for worker to process remaining jobs
|
||||
r.archivePending.Wait()
|
||||
}
|
||||
|
||||
var ErrNotFound = errors.New("no such jobname, project or user")
|
||||
var ErrForbidden = errors.New("not authorized")
|
||||
|
||||
// FindJobOrUserOrProject returns a job database ID or a username or a projectId if a job or user or project matches the search term.
|
||||
// As 0 is a valid job id, check if username/projectId is "" instead in order to check what matched.
|
||||
// FindJobnameOrUserOrProject returns a jobName or a username or a projectId if a jobName or user or project matches the search term.
|
||||
// If query is found to be an integer (= conversion to INT datatype succeeds), skip back to parent call
|
||||
// If nothing matches the search, `ErrNotFound` is returned.
|
||||
|
||||
func (r *JobRepository) FindJobnameOrUserOrProject(ctx context.Context, searchterm string) (metasnip string, username string, project string, err error) {
|
||||
|
Reference in New Issue
Block a user