mirror of
https://gitlab.cs.uni-saarland.de/hpc/cc-condor-sync.git
synced 2024-11-10 02:47:25 +01:00
7a0c41e378
And handle pagination.
126 lines
4.9 KiB
Python
126 lines
4.9 KiB
Python
#!/usr/bin/python3
|
|
import time
|
|
from dateutil import parser as dateparser
|
|
import requests
|
|
import json
|
|
import subprocess
|
|
|
|
|
|
class CCApi:
|
|
config = {}
|
|
apiurl = ''
|
|
apikey = ''
|
|
headers = {}
|
|
debug = False
|
|
|
|
def __init__(self, config, debug=False):
|
|
self.config = config
|
|
self.apiurl = "%s/api/" % config['cc-backend']['host']
|
|
self.apikey = config['cc-backend']['apikey']
|
|
self.headers = {'accept': 'application/ld+json',
|
|
'Content-Type': 'application/json',
|
|
'Authorization': 'Bearer %s' % self.config['cc-backend']['apikey']}
|
|
self.debug = debug
|
|
|
|
def stopJob(self, id, data):
|
|
url = self.apiurl+"jobs/stop_job/%d" % id
|
|
r = requests.post(url, headers=self.headers, json=data)
|
|
if r.status_code == 200:
|
|
return r.json()
|
|
else:
|
|
print(data)
|
|
print(r.status_code, r.content)
|
|
return False
|
|
|
|
def getJobs(self, filter_running=True):
|
|
url = self.apiurl+"jobs/"
|
|
if filter_running:
|
|
url = url+"?state=running&items-per-page=100"
|
|
jobs = []
|
|
page = 1
|
|
while True:
|
|
r = requests.get(url + "&page=%d" % page, headers=self.headers)
|
|
page += 1
|
|
if r.status_code == 200:
|
|
new_jobs = r.json()['jobs']
|
|
if len(new_jobs) < 100:
|
|
jobs.extend(new_jobs)
|
|
return jobs
|
|
else:
|
|
jobs.extend(new_jobs)
|
|
else:
|
|
return []
|
|
|
|
def _getSubmitNodeId(self, globalJobId):
|
|
job_id_parts = globalJobId.split('#')
|
|
return self.config['htcondor']['submitnodes'][job_id_parts[0].split('.')[0]]
|
|
|
|
def _jobIdToInt(self, globalJobId):
|
|
job_id_parts = globalJobId.split('#')
|
|
cluster_id, proc_id = [int(id) for id in job_id_parts[1].split('.')]
|
|
return cluster_id << 32 | ((proc_id & 0x3FFFFFFF) << 2) | self._getSubmitNodeId(globalJobId)
|
|
|
|
|
|
def compare_jobs(self, job, cc_job):
|
|
print(self._jobIdToInt(job['GlobalJobId']), cc_job['jobId'])
|
|
return self._jobIdToInt(job['GlobalJobId']) == cc_job['jobId'] and \
|
|
abs(cc_job['startTime'] - job['EnteredCurrentStatus']) < 5
|
|
|
|
def get_entered_current_status(arrayId, jobId, startTime):
|
|
conduit = "conduit" if arrayId % 10 == 0 else "conduit2"
|
|
condor_job_id = str(jobId >> 32) + "." + str((jobId >> 2) & 0x3FFFFFFF)
|
|
historys = subprocess.run(
|
|
["ssh", conduit + ".cs.uni-saarland.de", "condor_history", "-json", condor_job_id, "-limit", "1"], capture_output=True, text=True).stdout
|
|
history = json.loads(historys)
|
|
if len(history) > 0:
|
|
return history[0]['EnteredCurrentStatus']
|
|
querys = subprocess.run(
|
|
["ssh", conduit + ".cs.uni-saarland.de", "condor_q", "-json", condor_job_id], capture_output=True, text=True).stdout
|
|
query = json.loads(querys)
|
|
if len(query) > 0:
|
|
return query[0]['EnteredCurrentStatus']
|
|
return startTime + 1
|
|
|
|
if __name__ == "__main__":
|
|
import argparse
|
|
about = """This script syncs the slurm jobs with the cluster cockpit backend. It uses
|
|
the slurm command line tools to gather the relevant slurm infos and reads
|
|
the corresponding info from cluster cockpit via its api.
|
|
|
|
After reading the data, it stops all jobs in cluster cockpit which are
|
|
not running any more according to slurm and afterwards it creates all new
|
|
running jobs in cluster cockpit.
|
|
"""
|
|
parser = argparse.ArgumentParser(description=about)
|
|
parser.add_argument(
|
|
"-c", "--config", help="Read config file. Default: config.json", default="config.json")
|
|
parser.add_argument(
|
|
"-d", "--debug", help="Enable debug output", action="store_true")
|
|
args = parser.parse_args()
|
|
|
|
with open(args.config, 'r', encoding='utf-8') as f:
|
|
config = json.load(f)
|
|
|
|
cc = CCApi(config, args.debug)
|
|
|
|
condor_jobs = subprocess.run(
|
|
["ssh", "conduit2.cs.uni-saarland.de", "condor_q", "-json", "-all", "-glob", "-constraint", "\"JobStatus == 2\""], capture_output=True, text=True).stdout
|
|
running_jobs = json.loads(condor_jobs)
|
|
|
|
cc_jobs = cc.getJobs()
|
|
running_job_dict = {cc._jobIdToInt(job['GlobalJobId']): (job['GlobalJobId'], job['JobCurrentStartDate']) for job in running_jobs}
|
|
print("CC jobs:", len(cc_jobs), " Condor jobs:", len(running_job_dict))
|
|
for cc_job in cc_jobs:
|
|
startTime = cc_job['startTime']
|
|
if not cc_job['jobId'] in running_job_dict or abs(startTime - running_job_dict[cc_job['jobId']][1]) > 5:
|
|
data = {
|
|
"jobState": "cancelled",
|
|
"stopTime": get_entered_current_status(cc_job['arrayJobId'], cc_job['jobId'], startTime),
|
|
"cluster": cc_job['cluster'],
|
|
"jobId": cc_job['jobId'],
|
|
"startTime": startTime
|
|
}
|
|
print(data)
|
|
cc.stopJob(cc_job['id'], data)
|
|
|