cc-condor-sync/scripts/stop-jobs-no-longer-running.py
Joachim Meyer 7a0c41e378 Make stop script more robust.
And handle pagination.
2023-05-08 09:40:50 +02:00

126 lines
4.9 KiB
Python

#!/usr/bin/python3
import time
from dateutil import parser as dateparser
import requests
import json
import subprocess
class CCApi:
config = {}
apiurl = ''
apikey = ''
headers = {}
debug = False
def __init__(self, config, debug=False):
self.config = config
self.apiurl = "%s/api/" % config['cc-backend']['host']
self.apikey = config['cc-backend']['apikey']
self.headers = {'accept': 'application/ld+json',
'Content-Type': 'application/json',
'Authorization': 'Bearer %s' % self.config['cc-backend']['apikey']}
self.debug = debug
def stopJob(self, id, data):
url = self.apiurl+"jobs/stop_job/%d" % id
r = requests.post(url, headers=self.headers, json=data)
if r.status_code == 200:
return r.json()
else:
print(data)
print(r.status_code, r.content)
return False
def getJobs(self, filter_running=True):
url = self.apiurl+"jobs/"
if filter_running:
url = url+"?state=running&items-per-page=100"
jobs = []
page = 1
while True:
r = requests.get(url + "&page=%d" % page, headers=self.headers)
page += 1
if r.status_code == 200:
new_jobs = r.json()['jobs']
if len(new_jobs) < 100:
jobs.extend(new_jobs)
return jobs
else:
jobs.extend(new_jobs)
else:
return []
def _getSubmitNodeId(self, globalJobId):
job_id_parts = globalJobId.split('#')
return self.config['htcondor']['submitnodes'][job_id_parts[0].split('.')[0]]
def _jobIdToInt(self, globalJobId):
job_id_parts = globalJobId.split('#')
cluster_id, proc_id = [int(id) for id in job_id_parts[1].split('.')]
return cluster_id << 32 | ((proc_id & 0x3FFFFFFF) << 2) | self._getSubmitNodeId(globalJobId)
def compare_jobs(self, job, cc_job):
print(self._jobIdToInt(job['GlobalJobId']), cc_job['jobId'])
return self._jobIdToInt(job['GlobalJobId']) == cc_job['jobId'] and \
abs(cc_job['startTime'] - job['EnteredCurrentStatus']) < 5
def get_entered_current_status(arrayId, jobId, startTime):
conduit = "conduit" if arrayId % 10 == 0 else "conduit2"
condor_job_id = str(jobId >> 32) + "." + str((jobId >> 2) & 0x3FFFFFFF)
historys = subprocess.run(
["ssh", conduit + ".cs.uni-saarland.de", "condor_history", "-json", condor_job_id, "-limit", "1"], capture_output=True, text=True).stdout
history = json.loads(historys)
if len(history) > 0:
return history[0]['EnteredCurrentStatus']
querys = subprocess.run(
["ssh", conduit + ".cs.uni-saarland.de", "condor_q", "-json", condor_job_id], capture_output=True, text=True).stdout
query = json.loads(querys)
if len(query) > 0:
return query[0]['EnteredCurrentStatus']
return startTime + 1
if __name__ == "__main__":
import argparse
about = """This script syncs the slurm jobs with the cluster cockpit backend. It uses
the slurm command line tools to gather the relevant slurm infos and reads
the corresponding info from cluster cockpit via its api.
After reading the data, it stops all jobs in cluster cockpit which are
not running any more according to slurm and afterwards it creates all new
running jobs in cluster cockpit.
"""
parser = argparse.ArgumentParser(description=about)
parser.add_argument(
"-c", "--config", help="Read config file. Default: config.json", default="config.json")
parser.add_argument(
"-d", "--debug", help="Enable debug output", action="store_true")
args = parser.parse_args()
with open(args.config, 'r', encoding='utf-8') as f:
config = json.load(f)
cc = CCApi(config, args.debug)
condor_jobs = subprocess.run(
["ssh", "conduit2.cs.uni-saarland.de", "condor_q", "-json", "-all", "-glob", "-constraint", "\"JobStatus == 2\""], capture_output=True, text=True).stdout
running_jobs = json.loads(condor_jobs)
cc_jobs = cc.getJobs()
running_job_dict = {cc._jobIdToInt(job['GlobalJobId']): (job['GlobalJobId'], job['JobCurrentStartDate']) for job in running_jobs}
print("CC jobs:", len(cc_jobs), " Condor jobs:", len(running_job_dict))
for cc_job in cc_jobs:
startTime = cc_job['startTime']
if not cc_job['jobId'] in running_job_dict or abs(startTime - running_job_dict[cc_job['jobId']][1]) > 5:
data = {
"jobState": "cancelled",
"stopTime": get_entered_current_status(cc_job['arrayJobId'], cc_job['jobId'], startTime),
"cluster": cc_job['cluster'],
"jobId": cc_job['jobId'],
"startTime": startTime
}
print(data)
cc.stopJob(cc_job['id'], data)