fix stop_job returned state; handle monitoring status

This commit is contained in:
Lou Knauer 2022-02-15 14:25:39 +01:00
parent 8d4498f58e
commit 2f32760cc5
6 changed files with 22 additions and 17 deletions

View File

@ -255,12 +255,6 @@ func (api *RestApi) startJob(rw http.ResponseWriter, r *http.Request) {
}) })
} }
const (
// TODO: Constants in schema/? What constants to use?
MonitoringStatusArchivingSuccessfull int32 = 0
MonitoringStatusArchivingFailed int32 = 2
)
// A job has stopped and should be archived. // A job has stopped and should be archived.
func (api *RestApi) stopJob(rw http.ResponseWriter, r *http.Request) { func (api *RestApi) stopJob(rw http.ResponseWriter, r *http.Request) {
if user := auth.GetUser(r.Context()); user != nil && !user.HasRole(auth.RoleApi) { if user := auth.GetUser(r.Context()); user != nil && !user.HasRole(auth.RoleApi) {
@ -314,7 +308,8 @@ func (api *RestApi) stopJob(rw http.ResponseWriter, r *http.Request) {
// Mark job as stopped in the database (update state and duration) // Mark job as stopped in the database (update state and duration)
job.Duration = int32(req.StopTime - job.StartTime.Unix()) job.Duration = int32(req.StopTime - job.StartTime.Unix())
if err := api.JobRepository.Stop(job.ID, job.Duration, req.State); err != nil { job.State = req.State
if err := api.JobRepository.Stop(job.ID, job.Duration, job.State, job.MonitoringStatus); err != nil {
handleError(fmt.Errorf("marking job as stopped failed: %w", err), http.StatusInternalServerError, rw) handleError(fmt.Errorf("marking job as stopped failed: %w", err), http.StatusInternalServerError, rw)
return return
} }
@ -328,6 +323,11 @@ func (api *RestApi) stopJob(rw http.ResponseWriter, r *http.Request) {
rw.WriteHeader(http.StatusOK) rw.WriteHeader(http.StatusOK)
json.NewEncoder(rw).Encode(job) json.NewEncoder(rw).Encode(job)
// Monitoring is disabled...
if job.MonitoringStatus == schema.MonitoringStatusDisabled {
return
}
// We need to start a new goroutine as this functions needs to return // We need to start a new goroutine as this functions needs to return
// for the response to be flushed to the client. // for the response to be flushed to the client.
api.OngoingArchivings.Add(1) // So that a shutdown does not interrupt this goroutine. api.OngoingArchivings.Add(1) // So that a shutdown does not interrupt this goroutine.
@ -338,12 +338,12 @@ func (api *RestApi) stopJob(rw http.ResponseWriter, r *http.Request) {
jobMeta, err := metricdata.ArchiveJob(job, context.Background()) jobMeta, err := metricdata.ArchiveJob(job, context.Background())
if err != nil { if err != nil {
log.Errorf("archiving job (dbid: %d) failed: %s", job.ID, err.Error()) log.Errorf("archiving job (dbid: %d) failed: %s", job.ID, err.Error())
api.JobRepository.UpdateMonitoringStatus(job.ID, MonitoringStatusArchivingFailed) api.JobRepository.UpdateMonitoringStatus(job.ID, schema.MonitoringStatusArchivingFailed)
return return
} }
// Update the jobs database entry one last time: // Update the jobs database entry one last time:
if err := api.JobRepository.Archive(job.ID, 0, jobMeta.Statistics); err != nil { if err := api.JobRepository.Archive(job.ID, schema.MonitoringStatusArchivingSuccessful, jobMeta.Statistics); err != nil {
log.Errorf("archiving job (dbid: %d) failed: %s", job.ID, err.Error()) log.Errorf("archiving job (dbid: %d) failed: %s", job.ID, err.Error())
return return
} }

View File

@ -40,7 +40,7 @@ const JOBS_DB_SCHEMA string = `
num_acc INT NOT NULL, num_acc INT NOT NULL,
smt TINYINT NOT NULL DEFAULT 1 CHECK(smt IN (0, 1 )), smt TINYINT NOT NULL DEFAULT 1 CHECK(smt IN (0, 1 )),
exclusive TINYINT NOT NULL DEFAULT 1 CHECK(exclusive IN (0, 1, 2)), exclusive TINYINT NOT NULL DEFAULT 1 CHECK(exclusive IN (0, 1, 2)),
monitoring_status TINYINT NOT NULL DEFAULT 1 CHECK(monitoring_status IN (0, 1 )), monitoring_status TINYINT NOT NULL DEFAULT 1 CHECK(monitoring_status IN (0, 1, 2, 3)),
mem_used_max REAL NOT NULL DEFAULT 0.0, mem_used_max REAL NOT NULL DEFAULT 0.0,
flops_any_avg REAL NOT NULL DEFAULT 0.0, flops_any_avg REAL NOT NULL DEFAULT 0.0,

View File

@ -136,10 +136,6 @@ func loadAveragesFromArchive(job *schema.Job, metrics []string, data [][]schema.
// Writes a running job to the job-archive // Writes a running job to the job-archive
func ArchiveJob(job *schema.Job, ctx context.Context) (*schema.JobMeta, error) { func ArchiveJob(job *schema.Job, ctx context.Context) (*schema.JobMeta, error) {
if job.State != schema.JobStateRunning {
return nil, errors.New("cannot archive job that is not running")
}
allMetrics := make([]string, 0) allMetrics := make([]string, 0)
metricConfigs := config.GetClusterConfig(job.Cluster).MetricConfig metricConfigs := config.GetClusterConfig(job.Cluster).MetricConfig
for _, mc := range metricConfigs { for _, mc := range metricConfigs {

View File

@ -65,7 +65,7 @@ func LoadData(job *schema.Job, metrics []string, scopes []schema.MetricScope, ct
data := cache.Get(cacheKey(job, metrics, scopes), func() (interface{}, time.Duration, int) { data := cache.Get(cacheKey(job, metrics, scopes), func() (interface{}, time.Duration, int) {
var jd schema.JobData var jd schema.JobData
var err error var err error
if job.State == schema.JobStateRunning || !useArchive { if job.State == schema.JobStateRunning || job.MonitoringStatus == schema.MonitoringStatusRunningOrArchiving || !useArchive {
repo, ok := metricDataRepos[job.Cluster] repo, ok := metricDataRepos[job.Cluster]
if !ok { if !ok {
return fmt.Errorf("no metric data repository configured for '%s'", job.Cluster), 0, 0 return fmt.Errorf("no metric data repository configured for '%s'", job.Cluster), 0, 0

View File

@ -77,11 +77,13 @@ func (r *JobRepository) Start(job *schema.JobMeta) (id int64, err error) {
func (r *JobRepository) Stop( func (r *JobRepository) Stop(
jobId int64, jobId int64,
duration int32, duration int32,
state schema.JobState) (err error) { state schema.JobState,
monitoringStatus int32) (err error) {
stmt := sq.Update("job"). stmt := sq.Update("job").
Set("job_state", state). Set("job_state", state).
Set("duration", duration). Set("duration", duration).
Set("monitoring_status", monitoringStatus).
Where("job.id = ?", jobId) Where("job.id = ?", jobId)
_, err = stmt.RunWith(r.DB).Exec() _, err = stmt.RunWith(r.DB).Exec()

View File

@ -57,9 +57,16 @@ type JobMeta struct {
Statistics map[string]JobStatistics `json:"statistics,omitempty"` Statistics map[string]JobStatistics `json:"statistics,omitempty"`
} }
const (
MonitoringStatusDisabled int32 = 0
MonitoringStatusRunningOrArchiving int32 = 1
MonitoringStatusArchivingFailed int32 = 2
MonitoringStatusArchivingSuccessful int32 = 3
)
var JobDefaults BaseJob = BaseJob{ var JobDefaults BaseJob = BaseJob{
Exclusive: 1, Exclusive: 1,
MonitoringStatus: 1, MonitoringStatus: MonitoringStatusRunningOrArchiving,
MetaData: "", MetaData: "",
} }