split statsTable data from jobMetrics query, initial commit

- mainly backend changes
- statstable changes only for prototyping
This commit is contained in:
Christoph Kluge
2025-03-13 17:33:55 +01:00
parent d0af933b35
commit f5f36427a4
19 changed files with 1471 additions and 426 deletions

View File

@@ -27,6 +27,8 @@ type ArchiveBackend interface {
LoadJobData(job *schema.Job) (schema.JobData, error)
LoadJobStats(job *schema.Job) (schema.ScopedJobStats, error)
LoadClusterCfg(name string) (*schema.Cluster, error)
StoreJobMeta(jobMeta *schema.JobMeta) error
@@ -125,7 +127,7 @@ func LoadAveragesFromArchive(
return nil
}
// Helper to metricdataloader.LoadStatData().
// Helper to metricdataloader.LoadJobStats().
func LoadStatsFromArchive(
job *schema.Job,
metrics []string,
@@ -154,6 +156,22 @@ func LoadStatsFromArchive(
return data, nil
}
// Helper to metricdataloader.LoadScopedJobStats().
func LoadScopedStatsFromArchive(
job *schema.Job,
metrics []string,
scopes []schema.MetricScope,
) (schema.ScopedJobStats, error) {
data, err := ar.LoadJobStats(job)
if err != nil {
log.Warn("Error while loading job metadata from archiveBackend")
return nil, err
}
return data, nil
}
func GetStatistics(job *schema.Job) (map[string]schema.JobStatistics, error) {
metaFile, err := ar.LoadJobMeta(job)
if err != nil {

View File

@@ -115,6 +115,40 @@ func loadJobData(filename string, isCompressed bool) (schema.JobData, error) {
}
}
func loadJobStats(filename string, isCompressed bool) (schema.ScopedJobStats, error) {
f, err := os.Open(filename)
if err != nil {
log.Errorf("fsBackend LoadJobStats()- %v", err)
return nil, err
}
defer f.Close()
if isCompressed {
r, err := gzip.NewReader(f)
if err != nil {
log.Errorf(" %v", err)
return nil, err
}
defer r.Close()
if config.Keys.Validate {
if err := schema.Validate(schema.Data, r); err != nil {
return nil, fmt.Errorf("validate job data: %v", err)
}
}
return DecodeJobStats(r, filename)
} else {
if config.Keys.Validate {
if err := schema.Validate(schema.Data, bufio.NewReader(f)); err != nil {
return nil, fmt.Errorf("validate job data: %v", err)
}
}
return DecodeJobStats(bufio.NewReader(f), filename)
}
}
func (fsa *FsArchive) Init(rawConfig json.RawMessage) (uint64, error) {
var config FsArchiveConfig
@@ -389,6 +423,18 @@ func (fsa *FsArchive) LoadJobData(job *schema.Job) (schema.JobData, error) {
return loadJobData(filename, isCompressed)
}
func (fsa *FsArchive) LoadJobStats(job *schema.Job) (schema.ScopedJobStats, error) {
var isCompressed bool = true
filename := getPath(job, fsa.path, "data.json.gz")
if !util.CheckFileExists(filename) {
filename = getPath(job, fsa.path, "data.json")
isCompressed = false
}
return loadJobStats(filename, isCompressed)
}
func (fsa *FsArchive) LoadJobMeta(job *schema.Job) (*schema.JobMeta, error) {
filename := getPath(job, fsa.path, "meta.json")
return loadJobMeta(filename)

View File

@@ -32,6 +32,43 @@ func DecodeJobData(r io.Reader, k string) (schema.JobData, error) {
return data.(schema.JobData), nil
}
func DecodeJobStats(r io.Reader, k string) (schema.ScopedJobStats, error) {
jobData, err := DecodeJobData(r, k)
// Convert schema.JobData to schema.ScopedJobStats
if jobData != nil {
scopedJobStats := make(schema.ScopedJobStats)
for metric, metricData := range jobData {
if _, ok := scopedJobStats[metric]; !ok {
scopedJobStats[metric] = make(map[schema.MetricScope][]*schema.ScopedStats)
}
for scope, jobMetric := range metricData {
if _, ok := scopedJobStats[metric][scope]; !ok {
scopedJobStats[metric][scope] = make([]*schema.ScopedStats, 0)
}
for _, series := range jobMetric.Series {
scopedJobStats[metric][scope] = append(scopedJobStats[metric][scope], &schema.ScopedStats{
Hostname: series.Hostname,
Id: series.Id,
Data: &series.Statistics,
})
}
// So that one can later check len(scopedJobStats[metric][scope]): Remove from map if empty
if len(scopedJobStats[metric][scope]) == 0 {
delete(scopedJobStats[metric], scope)
if len(scopedJobStats[metric]) == 0 {
delete(scopedJobStats, metric)
}
}
}
}
return scopedJobStats, nil
}
return nil, err
}
func DecodeJobMeta(r io.Reader) (*schema.JobMeta, error) {
var d schema.JobMeta
if err := json.NewDecoder(r).Decode(&d); err != nil {

View File

@@ -15,6 +15,7 @@ import (
)
type JobData map[string]map[MetricScope]*JobMetric
type ScopedJobStats map[string]map[MetricScope][]*ScopedStats
type JobMetric struct {
StatisticsSeries *StatsSeries `json:"statisticsSeries,omitempty"`
@@ -30,6 +31,12 @@ type Series struct {
Statistics MetricStatistics `json:"statistics"`
}
type ScopedStats struct {
Hostname string `json:"hostname"`
Id *string `json:"id,omitempty"`
Data *MetricStatistics `json:"data"`
}
type MetricStatistics struct {
Avg float64 `json:"avg"`
Min float64 `json:"min"`