Add new script to stop jobs that were (for soome reason) not stopped in CC.

This commit is contained in:
Joachim Meyer 2023-03-22 10:51:00 +01:00
parent 4259611495
commit 89f6440e0f

View File

@ -0,0 +1,115 @@
#!/usr/bin/python3
import time
from dateutil import parser as dateparser
import requests
import json
import subprocess
class CCApi:
config = {}
apiurl = ''
apikey = ''
headers = {}
debug = False
def __init__(self, config, debug=False):
self.config = config
self.apiurl = "%s/api/" % config['cc-backend']['host']
self.apikey = config['cc-backend']['apikey']
self.headers = {'accept': 'application/ld+json',
'Content-Type': 'application/json',
'Authorization': 'Bearer %s' % self.config['cc-backend']['apikey']}
self.debug = debug
def stopJob(self, id, data):
url = self.apiurl+"jobs/stop_job/%d" % id
r = requests.post(url, headers=self.headers, json=data)
if r.status_code == 200:
return r.json()
else:
print(data)
print(r.status_code, r.content)
return False
def getJobs(self, filter_running=True):
url = self.apiurl+"jobs/"
if filter_running:
url = url+"?state=running"
r = requests.get(url, headers=self.headers)
if r.status_code == 200:
return r.json()['jobs']
else:
return []
def _getSubmitNodeId(self, globalJobId):
job_id_parts = globalJobId.split('#')
return self.config['htcondor']['submitnodes'][job_id_parts[0].split('.')[0]]
def _jobIdToInt(self, globalJobId):
job_id_parts = globalJobId.split('#')
cluster_id, proc_id = [int(id) for id in job_id_parts[1].split('.')]
return cluster_id << 32 | ((proc_id & 0x3FFFFFFF) << 2) | self._getSubmitNodeId(globalJobId)
def compare_jobs(self, job, cc_job):
print(self._jobIdToInt(job['GlobalJobId']), cc_job['jobId'])
return self._jobIdToInt(job['GlobalJobId']) == cc_job['jobId'] and \
abs(cc_job['startTime'] - job['EnteredCurrentStatus']) < 5
def get_entered_current_status(arrayId, jobId, startTime):
conduit = "conduit" if arrayId % 10 == 0 else "conduit2"
condor_job_id = str(jobId >> 32) + "." + str((jobId >> 2) & 0x3FFFFFFF)
historys = subprocess.run(
["ssh", conduit + ".cs.uni-saarland.de", "condor_history", "-json", condor_job_id, "-limit", "1"], capture_output=True, text=True).stdout
history = json.loads(historys)
if len(history) > 0:
return history[0]['EnteredCurrentStatus']
querys = subprocess.run(
["ssh", conduit + ".cs.uni-saarland.de", "condor_q", "-json", condor_job_id], capture_output=True, text=True).stdout
query = json.loads(querys)
if len(query) > 0:
return query[0]['EnteredCurrentStatus']
return startTime + 1
if __name__ == "__main__":
import argparse
about = """This script syncs the slurm jobs with the cluster cockpit backend. It uses
the slurm command line tools to gather the relevant slurm infos and reads
the corresponding info from cluster cockpit via its api.
After reading the data, it stops all jobs in cluster cockpit which are
not running any more according to slurm and afterwards it creates all new
running jobs in cluster cockpit.
"""
parser = argparse.ArgumentParser(description=about)
parser.add_argument(
"-c", "--config", help="Read config file. Default: config.json", default="config.json")
parser.add_argument(
"-d", "--debug", help="Enable debug output", action="store_true")
args = parser.parse_args()
with open(args.config, 'r', encoding='utf-8') as f:
config = json.load(f)
cc = CCApi(config, args.debug)
condor_jobs = subprocess.run(
["ssh", "conduit2.cs.uni-saarland.de", "condor_q", "-json", "-all", "-glob"], capture_output=True, text=True).stdout
running_jobs = json.loads(condor_jobs)
cc_jobs = cc.getJobs()
running_job_dict = {cc._jobIdToInt(job['GlobalJobId']): (job['GlobalJobId'], job['EnteredCurrentStatus']) for job in running_jobs}
for cc_job in cc_jobs:
startTime = cc_job['startTime']
if not cc_job['jobId'] in running_job_dict or abs(startTime - running_job_dict[cc_job['jobId']][1]) > 5:
data = {
"jobState": "cancelled",
"stopTime": get_entered_current_status(cc_job['arrayJobId'], cc_job['jobId'], startTime),
"cluster": cc_job['cluster'],
"jobId": cc_job['jobId'],
"startTime": startTime
}
print(data)
cc.stopJob(cc_job['id'], data)