From 89f6440e0fda275e3bbc6bac14408771b9147742 Mon Sep 17 00:00:00 2001 From: Joachim Meyer Date: Wed, 22 Mar 2023 10:51:00 +0100 Subject: [PATCH] Add new script to stop jobs that were (for soome reason) not stopped in CC. --- scripts/stop-jobs-no-longer-running.py | 115 +++++++++++++++++++++++++ 1 file changed, 115 insertions(+) create mode 100644 scripts/stop-jobs-no-longer-running.py diff --git a/scripts/stop-jobs-no-longer-running.py b/scripts/stop-jobs-no-longer-running.py new file mode 100644 index 0000000..f97c69b --- /dev/null +++ b/scripts/stop-jobs-no-longer-running.py @@ -0,0 +1,115 @@ +#!/usr/bin/python3 +import time +from dateutil import parser as dateparser +import requests +import json +import subprocess + + +class CCApi: + config = {} + apiurl = '' + apikey = '' + headers = {} + debug = False + + def __init__(self, config, debug=False): + self.config = config + self.apiurl = "%s/api/" % config['cc-backend']['host'] + self.apikey = config['cc-backend']['apikey'] + self.headers = {'accept': 'application/ld+json', + 'Content-Type': 'application/json', + 'Authorization': 'Bearer %s' % self.config['cc-backend']['apikey']} + self.debug = debug + + def stopJob(self, id, data): + url = self.apiurl+"jobs/stop_job/%d" % id + r = requests.post(url, headers=self.headers, json=data) + if r.status_code == 200: + return r.json() + else: + print(data) + print(r.status_code, r.content) + return False + + def getJobs(self, filter_running=True): + url = self.apiurl+"jobs/" + if filter_running: + url = url+"?state=running" + r = requests.get(url, headers=self.headers) + if r.status_code == 200: + return r.json()['jobs'] + else: + return [] + + def _getSubmitNodeId(self, globalJobId): + job_id_parts = globalJobId.split('#') + return self.config['htcondor']['submitnodes'][job_id_parts[0].split('.')[0]] + + def _jobIdToInt(self, globalJobId): + job_id_parts = globalJobId.split('#') + cluster_id, proc_id = [int(id) for id in job_id_parts[1].split('.')] + return cluster_id << 32 | ((proc_id & 0x3FFFFFFF) << 2) | self._getSubmitNodeId(globalJobId) + + + def compare_jobs(self, job, cc_job): + print(self._jobIdToInt(job['GlobalJobId']), cc_job['jobId']) + return self._jobIdToInt(job['GlobalJobId']) == cc_job['jobId'] and \ + abs(cc_job['startTime'] - job['EnteredCurrentStatus']) < 5 + +def get_entered_current_status(arrayId, jobId, startTime): + conduit = "conduit" if arrayId % 10 == 0 else "conduit2" + condor_job_id = str(jobId >> 32) + "." + str((jobId >> 2) & 0x3FFFFFFF) + historys = subprocess.run( + ["ssh", conduit + ".cs.uni-saarland.de", "condor_history", "-json", condor_job_id, "-limit", "1"], capture_output=True, text=True).stdout + history = json.loads(historys) + if len(history) > 0: + return history[0]['EnteredCurrentStatus'] + querys = subprocess.run( + ["ssh", conduit + ".cs.uni-saarland.de", "condor_q", "-json", condor_job_id], capture_output=True, text=True).stdout + query = json.loads(querys) + if len(query) > 0: + return query[0]['EnteredCurrentStatus'] + return startTime + 1 + +if __name__ == "__main__": + import argparse + about = """This script syncs the slurm jobs with the cluster cockpit backend. It uses + the slurm command line tools to gather the relevant slurm infos and reads + the corresponding info from cluster cockpit via its api. + + After reading the data, it stops all jobs in cluster cockpit which are + not running any more according to slurm and afterwards it creates all new + running jobs in cluster cockpit. + """ + parser = argparse.ArgumentParser(description=about) + parser.add_argument( + "-c", "--config", help="Read config file. Default: config.json", default="config.json") + parser.add_argument( + "-d", "--debug", help="Enable debug output", action="store_true") + args = parser.parse_args() + + with open(args.config, 'r', encoding='utf-8') as f: + config = json.load(f) + + cc = CCApi(config, args.debug) + + condor_jobs = subprocess.run( + ["ssh", "conduit2.cs.uni-saarland.de", "condor_q", "-json", "-all", "-glob"], capture_output=True, text=True).stdout + running_jobs = json.loads(condor_jobs) + + cc_jobs = cc.getJobs() + running_job_dict = {cc._jobIdToInt(job['GlobalJobId']): (job['GlobalJobId'], job['EnteredCurrentStatus']) for job in running_jobs} + for cc_job in cc_jobs: + startTime = cc_job['startTime'] + if not cc_job['jobId'] in running_job_dict or abs(startTime - running_job_dict[cc_job['jobId']][1]) > 5: + data = { + "jobState": "cancelled", + "stopTime": get_entered_current_status(cc_job['arrayJobId'], cc_job['jobId'], startTime), + "cluster": cc_job['cluster'], + "jobId": cc_job['jobId'], + "startTime": startTime + } + print(data) + cc.stopJob(cc_job['id'], data) +