Merge pull request #320 from ClusterCockpit/hotfix

Fixes for Bugfix Release 1.4.2
This commit is contained in:
Jan Eitzinger 2024-12-19 14:51:07 +01:00 committed by GitHub
commit 9489ebc7d6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
21 changed files with 1178 additions and 1265 deletions

View File

@ -2,7 +2,7 @@ TARGET = ./cc-backend
VAR = ./var
CFG = config.json .env
FRONTEND = ./web/frontend
VERSION = 1.4.1
VERSION = 1.4.2
GIT_HASH := $(shell git rev-parse --short HEAD || echo 'development')
CURRENT_TIME = $(shell date +"%Y-%m-%d:T%H:%M:%S")
LD_FLAGS = '-s -X main.date=${CURRENT_TIME} -X main.version=${VERSION} -X main.commit=${GIT_HASH}'

View File

@ -1,4 +1,4 @@
# `cc-backend` version 1.4.1
# `cc-backend` version 1.4.2
Supports job archive version 2 and database version 8.
@ -12,7 +12,8 @@ For release specific notes visit the [ClusterCockpit Documentation](https://clus
migration might require several hours!
- You need to adapt the `cluster.json` configuration files in the job-archive,
add new required attributes to the metric list and after that edit
`./job-archive/version.txt` to version 2.
`./job-archive/version.txt` to version 2. Only metrics that have the footprint
attribute set can be filtered and show up in the footprint UI and polar plot.
- Continuous scrolling is default now in all job lists. You can change this back
to paging globally, also every user can configure to use paging or continuous
scrolling individually.

View File

@ -112,7 +112,7 @@ func main() {
if flagInit {
initEnv()
fmt.Print("Succesfully setup environment!\n")
fmt.Print("Successfully setup environment!\n")
fmt.Print("Please review config.json and .env and adjust it to your needs.\n")
fmt.Print("Add your job-archive at ./var/job-archive.\n")
os.Exit(0)

View File

@ -25,7 +25,6 @@ import (
"github.com/ClusterCockpit/cc-backend/internal/config"
"github.com/ClusterCockpit/cc-backend/internal/graph"
"github.com/ClusterCockpit/cc-backend/internal/graph/generated"
"github.com/ClusterCockpit/cc-backend/internal/repository"
"github.com/ClusterCockpit/cc-backend/internal/routerConfig"
"github.com/ClusterCockpit/cc-backend/pkg/log"
"github.com/ClusterCockpit/cc-backend/pkg/runtimeEnv"
@ -314,9 +313,6 @@ func serverShutdown() {
// First shut down the server gracefully (waiting for all ongoing requests)
server.Shutdown(context.Background())
// Then, wait for any async jobStarts still pending...
repository.WaitForJobStart()
// Then, wait for any async archivings still pending...
archiver.WaitForArchiving()
}

View File

@ -1,5 +1,5 @@
[Unit]
Description=ClusterCockpit Web Server (Go edition)
Description=ClusterCockpit Web Server
Documentation=https://github.com/ClusterCockpit/cc-backend
Wants=network-online.target
After=network-online.target

View File

@ -249,9 +249,6 @@ func TestRestApi(t *testing.T) {
if response.StatusCode != http.StatusCreated {
t.Fatal(response.Status, recorder.Body.String())
}
time.Sleep(1 * time.Second)
resolver := graph.GetResolverInstance()
job, err := restapi.JobRepository.Find(&TestJobId, &TestClusterName, &TestStartTime)
if err != nil {

View File

@ -123,18 +123,8 @@ func (api *RestApi) MountFrontendApiRoutes(r *mux.Router) {
}
}
// StartJobApiResponse model
type StartJobApiResponse struct {
Message string `json:"msg"`
}
// DeleteJobApiResponse model
type DeleteJobApiResponse struct {
Message string `json:"msg"`
}
// UpdateUserApiResponse model
type UpdateUserApiResponse struct {
// DefaultApiResponse model
type DefaultJobApiResponse struct {
Message string `json:"msg"`
}
@ -341,7 +331,7 @@ func (api *RestApi) getJobs(rw http.ResponseWriter, r *http.Request) {
withMetadata := false
filter := &model.JobFilter{}
page := &model.PageRequest{ItemsPerPage: 25, Page: 1}
order := &model.OrderByInput{Field: "startTime", Order: model.SortDirectionEnumDesc}
order := &model.OrderByInput{Field: "startTime", Type: "col", Order: model.SortDirectionEnumDesc}
for key, vals := range r.URL.Query() {
switch key {
@ -790,6 +780,11 @@ func (api *RestApi) startJob(rw http.ResponseWriter, r *http.Request) {
return
}
// aquire lock to avoid race condition between API calls
var unlockOnce sync.Once
api.RepositoryMutex.Lock()
defer unlockOnce.Do(api.RepositoryMutex.Unlock)
// Check if combination of (job_id, cluster_id, start_time) already exists:
jobs, err := api.JobRepository.FindAll(&req.JobID, &req.Cluster, nil)
if err != nil && err != sql.ErrNoRows {
@ -804,12 +799,27 @@ func (api *RestApi) startJob(rw http.ResponseWriter, r *http.Request) {
}
}
repository.TriggerJobStart(repository.JobWithUser{Job: &req, User: repository.GetUserFromContext(r.Context())})
id, err := api.JobRepository.Start(&req)
if err != nil {
handleError(fmt.Errorf("insert into database failed: %w", err), http.StatusInternalServerError, rw)
return
}
// unlock here, adding Tags can be async
unlockOnce.Do(api.RepositoryMutex.Unlock)
for _, tag := range req.Tags {
if _, err := api.JobRepository.AddTagOrCreate(repository.GetUserFromContext(r.Context()), id, tag.Type, tag.Name, tag.Scope); err != nil {
http.Error(rw, err.Error(), http.StatusInternalServerError)
handleError(fmt.Errorf("adding tag to new job %d failed: %w", id, err), http.StatusInternalServerError, rw)
return
}
}
log.Printf("new job (id: %d): cluster=%s, jobId=%d, user=%s, startTime=%d", id, req.Cluster, req.JobID, req.User, req.StartTime)
rw.Header().Add("Content-Type", "application/json")
rw.WriteHeader(http.StatusCreated)
json.NewEncoder(rw).Encode(StartJobApiResponse{
Message: fmt.Sprintf("Successfully triggered job start"),
json.NewEncoder(rw).Encode(DefaultJobApiResponse{
Message: "success",
})
}
@ -892,7 +902,7 @@ func (api *RestApi) deleteJobById(rw http.ResponseWriter, r *http.Request) {
}
rw.Header().Add("Content-Type", "application/json")
rw.WriteHeader(http.StatusOK)
json.NewEncoder(rw).Encode(DeleteJobApiResponse{
json.NewEncoder(rw).Encode(DefaultJobApiResponse{
Message: fmt.Sprintf("Successfully deleted job %s", id),
})
}
@ -943,7 +953,7 @@ func (api *RestApi) deleteJobByRequest(rw http.ResponseWriter, r *http.Request)
rw.Header().Add("Content-Type", "application/json")
rw.WriteHeader(http.StatusOK)
json.NewEncoder(rw).Encode(DeleteJobApiResponse{
json.NewEncoder(rw).Encode(DefaultJobApiResponse{
Message: fmt.Sprintf("Successfully deleted job %d", job.ID),
})
}
@ -987,7 +997,7 @@ func (api *RestApi) deleteJobBefore(rw http.ResponseWriter, r *http.Request) {
rw.Header().Add("Content-Type", "application/json")
rw.WriteHeader(http.StatusOK)
json.NewEncoder(rw).Encode(DeleteJobApiResponse{
json.NewEncoder(rw).Encode(DefaultJobApiResponse{
Message: fmt.Sprintf("Successfully deleted %d jobs", cnt),
})
}

View File

@ -36,10 +36,7 @@ func (r *jobResolver) Tags(ctx context.Context, obj *schema.Job) ([]*schema.Tag,
// ConcurrentJobs is the resolver for the concurrentJobs field.
func (r *jobResolver) ConcurrentJobs(ctx context.Context, obj *schema.Job) (*model.JobLinkResultList, error) {
if obj.State == schema.JobStateRunning {
obj.Duration = int32(time.Now().Unix() - obj.StartTimeUnix)
}
// FIXME: Make the hardcoded duration configurable
if obj.Exclusive != 1 && obj.Duration > 600 {
return r.Repo.FindConcurrentJobs(ctx, obj)
}

View File

@ -82,8 +82,6 @@ func Connect(driver string, db string) {
if err != nil {
log.Fatal(err)
}
startJobStartWorker()
})
}

View File

@ -79,12 +79,9 @@ func scanJob(row interface{ Scan(...interface{}) error }) (*schema.Job, error) {
}
job.RawFootprint = nil
// if err := json.Unmarshal(job.RawMetaData, &job.MetaData); err != nil {
// return nil, err
// }
job.StartTime = time.Unix(job.StartTimeUnix, 0)
if job.Duration == 0 && job.State == schema.JobStateRunning {
// Always ensure accurate duration for running jobs
if job.State == schema.JobStateRunning {
job.Duration = int32(time.Since(job.StartTime).Seconds())
}
@ -457,6 +454,7 @@ func (r *JobRepository) AllocatedNodes(cluster string) (map[string]map[string]in
return subclusters, nil
}
// FIXME: Set duration to requested walltime?
func (r *JobRepository) StopJobsExceedingWalltimeBy(seconds int) error {
start := time.Now()
res, err := sq.Update("job").

View File

@ -170,8 +170,7 @@ func BuildWhereClause(filter *model.JobFilter, query sq.SelectBuilder) sq.Select
query = buildTimeCondition("job.start_time", filter.StartTime, query)
}
if filter.Duration != nil {
now := time.Now().Unix() // There does not seam to be a portable way to get the current unix timestamp accross different DBs.
query = query.Where("(CASE WHEN job.job_state = 'running' THEN (? - job.start_time) ELSE job.duration END) BETWEEN ? AND ?", now, filter.Duration.From, filter.Duration.To)
query = buildIntCondition("job.duration", filter.Duration, query)
}
if filter.MinRunningFor != nil {
now := time.Now().Unix() // There does not seam to be a portable way to get the current unix timestamp accross different DBs.

View File

@ -1,83 +0,0 @@
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
// All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package repository
import (
"sync"
"time"
"github.com/ClusterCockpit/cc-backend/pkg/log"
"github.com/ClusterCockpit/cc-backend/pkg/schema"
)
type JobWithUser struct {
Job *schema.JobMeta
User *schema.User
}
var (
jobStartPending sync.WaitGroup
jobStartChannel chan JobWithUser
)
func startJobStartWorker() {
jobStartChannel = make(chan JobWithUser, 128)
go jobStartWorker()
}
// Archiving worker thread
func jobStartWorker() {
for {
select {
case req, ok := <-jobStartChannel:
if !ok {
break
}
jobRepo := GetJobRepository()
var id int64
for i := 0; i < 5; i++ {
var err error
id, err = jobRepo.Start(req.Job)
if err != nil {
log.Errorf("Attempt %d: insert into database failed: %v", i, err)
} else {
break
}
time.Sleep(1 * time.Second)
}
for _, tag := range req.Job.Tags {
if _, err := jobRepo.AddTagOrCreate(req.User, id,
tag.Type, tag.Name, tag.Scope); err != nil {
log.Errorf("adding tag to new job %d failed: %v", id, err)
}
}
log.Printf("new job (id: %d): cluster=%s, jobId=%d, user=%s, startTime=%d",
id, req.Job.Cluster, req.Job.JobID, req.Job.User, req.Job.StartTime)
jobStartPending.Done()
}
}
}
// Trigger async archiving
func TriggerJobStart(req JobWithUser) {
if jobStartChannel == nil {
log.Fatal("Cannot start Job without jobStart channel. Did you Start the worker?")
}
jobStartPending.Add(1)
jobStartChannel <- req
}
// Wait for background thread to finish pending archiving operations
func WaitForJobStart() {
// close channel and wait for worker to process remaining jobs
jobStartPending.Wait()
}

View File

@ -111,7 +111,7 @@ func BenchmarkDB_QueryJobs(b *testing.B) {
user := "mppi133h"
filter.User = &model.StringInput{Eq: &user}
page := &model.PageRequest{ItemsPerPage: 50, Page: 1}
order := &model.OrderByInput{Field: "startTime", Order: model.SortDirectionEnumDesc}
order := &model.OrderByInput{Field: "startTime", Type: "col", Order: model.SortDirectionEnumDesc}
b.Run("QueryJobs", func(b *testing.B) {
db := setup(b)

View File

@ -182,6 +182,7 @@ func setupTaglistRoute(i InfoType, r *http.Request) InfoType {
return i
}
// FIXME: Lots of redundant code. Needs refactoring
func buildFilterPresets(query url.Values) map[string]interface{} {
filterPresets := map[string]interface{}{}

View File

@ -9,12 +9,11 @@
-->
<script context="module">
function findJobThresholds(job, metricConfig) {
if (!job || !metricConfig) {
function findJobThresholds(job, stat, metricConfig) {
if (!job || !metricConfig || !stat) {
console.warn("Argument missing for findJobThresholds!");
return null;
}
// metricConfig is on subCluster-Level
const defaultThresholds = {
peak: metricConfig.peak,
@ -22,13 +21,13 @@
caution: metricConfig.caution,
alert: metricConfig.alert
};
/*
NEW: Footprints should be comparable: Always use Unchanged Single Node Thresholds, except for shared jobs.
HW Clocks, HW Temperatures and File/Net IO Thresholds will be scaled down too, even if they are independent.
'jf.stats' is one of: avg, min, max -> Always relative to one nodes' thresholds as configured.
Footprints should be comparable:
Always use unchanged single node thresholds for exclusive jobs and "avg" Footprints.
For shared jobs, scale thresholds by the fraction of the job's HWThreads to the node's HWThreads.
'stat' is one of: avg, min, max
*/
if (job.exclusive === 1) {
if (job.exclusive === 1 || stat === "avg") {
return defaultThresholds
} else {
const topol = getContext("getHardwareTopology")(job.cluster, job.subCluster)
@ -40,29 +39,6 @@
alert: round(defaultThresholds.alert * jobFraction, 0),
};
}
/* OLD: Based on Metric Aggregation Setting
// Job_Exclusivity does not matter, only aggregation
if (metricConfig.aggregation === "avg") {
return defaultThresholds;
} else if (metricConfig.aggregation === "sum") {
const topol = getContext("getHardwareTopology")(job.cluster, job.subCluster)
const jobFraction = job.numHWThreads / topol.node.length;
return {
peak: round(defaultThresholds.peak * jobFraction, 0),
normal: round(defaultThresholds.normal * jobFraction, 0),
caution: round(defaultThresholds.caution * jobFraction, 0),
alert: round(defaultThresholds.alert * jobFraction, 0),
};
} else {
console.warn(
"Missing or unkown aggregation mode (sum/avg) for metric:",
metricConfig,
);
return defaultThresholds;
}
*/
}
</script>
@ -93,7 +69,7 @@
const unit = (fmc?.unit?.prefix ? fmc.unit.prefix : "") + (fmc?.unit?.base ? fmc.unit.base : "")
// Threshold / -Differences
const fmt = findJobThresholds(job, fmc);
const fmt = findJobThresholds(job, jf.stat, fmc);
if (jf.name === "flops_any") fmt.peak = round(fmt.peak * 0.85, 0);
// Define basic data -> Value: Use as Provided

View File

@ -7,7 +7,7 @@
-->
<script>
import { Badge, Button, Icon } from "@sveltestrap/sveltestrap";
import { Badge, Button, Icon, Tooltip } from "@sveltestrap/sveltestrap";
import { scrambleNames, scramble } from "../utils.js";
import Tag from "../helper/Tag.svelte";
import TagManagement from "../helper/TagManagement.svelte";
@ -42,12 +42,30 @@
let displayCheck = false;
function clipJobId(jid) {
displayCheck = true;
// Navigator clipboard api needs a secure context (https)
if (navigator.clipboard && window.isSecureContext) {
navigator.clipboard
.writeText(jid)
.catch((reason) => console.error(reason));
} else {
// Workaround: Create, Fill, And Copy Content of Textarea
const textArea = document.createElement("textarea");
textArea.value = jid;
textArea.style.position = "absolute";
textArea.style.left = "-999999px";
document.body.prepend(textArea);
textArea.select();
try {
document.execCommand('copy');
} catch (error) {
console.error(error);
} finally {
textArea.remove();
}
}
setTimeout(function () {
displayCheck = false;
}, 1500);
}, 1000);
}
</script>
@ -58,13 +76,18 @@
<a href="/monitoring/job/{job.id}" target="_blank">{job.jobId}</a>
({job.cluster})
</span>
<Button outline color="secondary" size="sm" title="Copy JobID to Clipboard" on:click={clipJobId(job.jobId)} >
<Button id={`${job.cluster}-${job.jobId}-clipboard`} outline color="secondary" size="sm" on:click={clipJobId(job.jobId)} >
{#if displayCheck}
<Icon name="clipboard2-check-fill"/> Copied
<Icon name="clipboard2-check-fill"/>
{:else}
<Icon name="clipboard2"/> Job ID
<Icon name="clipboard2"/>
{/if}
</Button>
<Tooltip
target={`${job.cluster}-${job.jobId}-clipboard`}
placement="right">
{ displayCheck ? 'Copied!' : 'Copy Job ID to Clipboard' }
</Tooltip>
</span>
{#if job.metaData?.jobName}
{#if job.metaData?.jobName.length <= 25}