mirror of
https://gitlab.cs.uni-saarland.de/hpc/cc-condor-sync.git
synced 2025-01-13 05:39:06 +01:00
Start revamping to use htcondor EventLog not slurm
This commit is contained in:
parent
c9aa4095fe
commit
308df9907e
510
htcondor-clusercockpit-push.py
Executable file
510
htcondor-clusercockpit-push.py
Executable file
@ -0,0 +1,510 @@
|
||||
#!/usr/bin/python3
|
||||
# This script syncs the slurm jobs with the cluster cockpit backend. It uses
|
||||
# the slurm command line tools to gather the relevant slurm infos and reads
|
||||
# the corresponding info from cluster cockpit via its api.
|
||||
#
|
||||
# After reading the data, it stops all jobs in cluster cockpit which are
|
||||
# not running any more according to slurm and afterwards it creates all new
|
||||
# running jobs in cluster cockpit.
|
||||
#
|
||||
# -- Michael Schwarz <schwarz@uni-paderborn.de>
|
||||
|
||||
import subprocess
|
||||
import json
|
||||
import time
|
||||
import requests
|
||||
import re
|
||||
import tailf
|
||||
|
||||
|
||||
class CCApi:
|
||||
config = {}
|
||||
apiurl = ''
|
||||
apikey = ''
|
||||
headers = {}
|
||||
|
||||
def __init__(self, config, debug=False):
|
||||
self.config = config
|
||||
self.apiurl = "%s/api/" % config['cc-backend']['host']
|
||||
self.apikey = config['cc-backend']['apikey']
|
||||
self.headers = {'accept': 'application/ld+json',
|
||||
'Content-Type': 'application/json',
|
||||
'Authorization': 'Bearer %s' % self.config['cc-backend']['apikey']}
|
||||
|
||||
def startJob(self, data):
|
||||
url = self.apiurl+"jobs/start_job/"
|
||||
r = requests.post(url, headers=self.headers, json=data)
|
||||
if r.status_code == 201:
|
||||
return r.json()
|
||||
elif r.status_code == 422:
|
||||
return False
|
||||
else:
|
||||
print(data)
|
||||
print(r)
|
||||
print(r.json())
|
||||
return False
|
||||
|
||||
def stopJob(self, data):
|
||||
url = self.apiurl+"jobs/stop_job/"
|
||||
r = requests.post(url, headers=self.headers, json=data)
|
||||
if r.status_code == 200:
|
||||
return r.json()
|
||||
else:
|
||||
print(data)
|
||||
print(r)
|
||||
return False
|
||||
|
||||
def getJobs(self, filter_running=True):
|
||||
url = self.apiurl+"jobs/"
|
||||
if filter_running:
|
||||
url = url+"?state=running"
|
||||
r = requests.get(url, headers=self.headers)
|
||||
if r.status_code == 200:
|
||||
return r.json()
|
||||
else:
|
||||
return {'jobs': []}
|
||||
|
||||
|
||||
class CondorSync:
|
||||
condorJobData = {}
|
||||
ccData = {}
|
||||
config = {}
|
||||
debug = False
|
||||
ccapi = None
|
||||
|
||||
def __init__(self, config, debug=False):
|
||||
self.config = config
|
||||
self.debug = debug
|
||||
|
||||
# validate config TODO
|
||||
if "htcondor" not in config:
|
||||
raise KeyError
|
||||
if "eventlog" not in config['htcondor']:
|
||||
raise KeyError
|
||||
if "cc-backend" not in config:
|
||||
raise KeyError
|
||||
if "host" not in config['cc-backend']:
|
||||
raise KeyError
|
||||
if "apikey" not in config['cc-backend']:
|
||||
raise KeyError
|
||||
|
||||
self.ccapi = CCApi(self.config, debug)
|
||||
|
||||
def _exec(self, command):
|
||||
process = subprocess.Popen(
|
||||
command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
|
||||
output, error = process.communicate()
|
||||
if process.returncode == 0:
|
||||
return output.decode('utf-8')
|
||||
else:
|
||||
print("Error: ", error)
|
||||
return ""
|
||||
|
||||
def _readCondorData(self):
|
||||
if self.debug:
|
||||
print("DEBUG: _readCondorData called")
|
||||
with open(self.config['htcondor']['eventlog']) as f:
|
||||
self.condorJobData = json.load(f)
|
||||
|
||||
def _readCCData(self):
|
||||
if self.debug:
|
||||
print("DEBUG: _readCCBackendData called")
|
||||
self.ccData = self.ccapi.getJobs()
|
||||
if self.debug:
|
||||
print("DEBUG: ccData:", self.ccData)
|
||||
|
||||
def _getAccDataForJob(self, jobid):
|
||||
raise NotImplementedError
|
||||
command = "%s -j %s --json" % (self.config['slurm']['sacct'], jobid)
|
||||
return json.loads(self._exec(command))
|
||||
|
||||
def _jobIdInCC(self, jobid):
|
||||
for job in self.ccData['jobs']:
|
||||
if jobid == job['jobId']:
|
||||
return True
|
||||
return False
|
||||
|
||||
def _jobIdToInt(self, globalJobId):
|
||||
job_id_parts = globalJobId.split('#')
|
||||
submit = self.config['htcondor']['submitnodes'][job_id_parts[0].split('.')[
|
||||
0]]
|
||||
cluster_id, proc_id = [int(id) for id in job_id_parts[1].split('.')]
|
||||
return cluster_id << 32 | ((proc_id & 0x3FFFFFFF) << 2) | submit
|
||||
|
||||
def _jobRunning(self, jobid):
|
||||
for job in self.slurmJobData['jobs']:
|
||||
if int(job['job_id']) == int(jobid):
|
||||
if job['job_state'] == 'RUNNING':
|
||||
return True
|
||||
return False
|
||||
|
||||
def _getACCIDsFromGRES(self, gres, nodename):
|
||||
ids = self.config['accelerators']
|
||||
|
||||
nodetype = None
|
||||
for k, v in ids.items():
|
||||
if nodename.startswith(k):
|
||||
nodetype = k
|
||||
|
||||
if not nodetype:
|
||||
print("WARNING: Can't find accelerator definition for node %s" %
|
||||
nodename.strip())
|
||||
return []
|
||||
|
||||
# the gres definition might be different on other clusters!
|
||||
m = re.match(r"(fpga|gpu):(\w+):(\d)\(IDX:([\d,\-]+)\)", gres)
|
||||
if m:
|
||||
family = m.group(1)
|
||||
type = m.group(2)
|
||||
amount = m.group(3)
|
||||
indexes = m.group(4)
|
||||
acc_id_list = []
|
||||
|
||||
# IDX might be: IDX:0,2-3
|
||||
# first split at , then expand the range to individual items
|
||||
if len(indexes) > 1:
|
||||
idx_list = indexes.split(',')
|
||||
idx = []
|
||||
for i in idx_list:
|
||||
if len(i) == 1:
|
||||
idx.append(i)
|
||||
else:
|
||||
start = i.split('-')[0]
|
||||
end = i.split('-')[1]
|
||||
idx = idx + list(range(int(start), int(end)+1))
|
||||
indexes = idx
|
||||
|
||||
for i in indexes:
|
||||
acc_id_list.append(ids[nodetype][str(i)])
|
||||
|
||||
print(acc_id_list)
|
||||
return acc_id_list
|
||||
|
||||
return []
|
||||
|
||||
def _ccStartJob(self, job):
|
||||
print("INFO: Create job %s, user %s, name %s" %
|
||||
(job['GlobalJobId'], job['Owner'], job['JobBatchName'] if 'JobBatchName' in job else ''))
|
||||
jobId = self._jobIdToInt(job['GlobalJobId'])
|
||||
for j in self.ccData['jobs']:
|
||||
if j['jobId'] == jobId:
|
||||
return
|
||||
|
||||
nodelist = [job['RemoteHost'].split('@')[1]]
|
||||
|
||||
# # Exclusive job?
|
||||
# if job['shared'] == "none":
|
||||
# exclusive = 1
|
||||
# # exclusive to user
|
||||
# elif job['shared'] == "user":
|
||||
# exclusive = 2
|
||||
# # exclusive to mcs
|
||||
# elif job['shared'] == "mcs":
|
||||
# exclusive = 3
|
||||
# # default is shared node
|
||||
# else:
|
||||
exclusive = 0
|
||||
|
||||
# read job script and environment
|
||||
# hashdir = "hash.%s" % str(job['GlobalJobId'])[-1]
|
||||
# jobscript_filename = "%s/%s/job.%s/script" % (
|
||||
# self.config['slurm']['state_save_location'], hashdir, job['GlobalJobId'])
|
||||
# jobscript = ''
|
||||
# try:
|
||||
# with open(jobscript_filename, 'r', encoding='utf-8') as f:
|
||||
# jobscript = f.read()
|
||||
# except FileNotFoundError:
|
||||
# jobscript = 'NO JOBSCRIPT'
|
||||
|
||||
environment = ''
|
||||
# FIXME sometimes produces utf-8 conversion errors
|
||||
# environment_filename = "%s/%s/job.%s/environment" % (
|
||||
# self.config['slurm']['state_save_location'], hashdir, job['GlobalJobId'])
|
||||
# try:
|
||||
# with open(environment_filename, 'r', encoding='utf-8') as f:
|
||||
# environment = f.read()
|
||||
# except FileNotFoundError:
|
||||
# environment = 'NO ENV'
|
||||
# except UnicodeDecodeError:
|
||||
# environment = 'UNICODE_DECODE_ERROR'
|
||||
|
||||
# truncate environment to 50.000 chars. Otherwise it might not fit into the
|
||||
# Database field together with the job script etc.
|
||||
# environment = environment[:50000]
|
||||
|
||||
# # get additional info from slurm and add environment
|
||||
# command = "scontrol show job %s" % job['GlobalJobId']
|
||||
# slurminfo = self._exec(command)
|
||||
# slurminfo = slurminfo + "ENV:\n====\n" + environment
|
||||
|
||||
if job['Subproc'] > 0:
|
||||
print("WARNING: did not expect to see Subproc != 0")
|
||||
|
||||
# build payload
|
||||
data = {'jobId': jobId,
|
||||
'user': job['Owner'],
|
||||
'cluster': self.config['cluster'],
|
||||
'numNodes': job['CurrentHosts'],
|
||||
'numHwthreads': job['CpusProvisioned'],
|
||||
'startTime': job['JobCurrentStartDate'],
|
||||
# 'walltime': int(job['time_limit']) * 60,
|
||||
'project': job['AccountingGroup'],
|
||||
'partition': 'main', # job['partition'],
|
||||
'exclusive': exclusive,
|
||||
'resources': [],
|
||||
'metadata': {
|
||||
'jobName': job['JobBatchName'] if 'JobBatchName' in job else ''
|
||||
}
|
||||
}
|
||||
|
||||
# is this part of an array job?
|
||||
if job['Cluster'] > 0:
|
||||
data.update({"arrayJobId": job['Cluster']})
|
||||
|
||||
num_acc = 0
|
||||
for node in nodelist:
|
||||
# begin dict
|
||||
resources = {'hostname': node.split('.')[0].strip()}
|
||||
|
||||
# if a job uses a node exclusive, there are some assigned cpus (used by this job)
|
||||
# and some unassigned cpus. In this case, the assigned_cpus are those which have
|
||||
# to be monitored, otherwise use the unassigned cpus.
|
||||
# hwthreads = job['job_resources']['allocated_nodes'][str(
|
||||
# i)]['cores']
|
||||
# cores_assigned = []
|
||||
# cores_unassigned = []
|
||||
# for k, v in hwthreads.items():
|
||||
# if v == "assigned":
|
||||
# cores_assigned.append(int(k))
|
||||
# else:
|
||||
# cores_unassigned.append(int(k))
|
||||
|
||||
# if len(cores_assigned) > 0:
|
||||
# cores = cores_assigned
|
||||
# else:
|
||||
# cores = cores_unassigned
|
||||
# resources.update({"hwthreads": cores})
|
||||
|
||||
# Get allocated GPUs if some are requested
|
||||
if 'AssignedGPUs' in job:
|
||||
|
||||
acc_ids = job['AssignedGPUs'].split(',')
|
||||
if len(acc_ids) > 0:
|
||||
num_acc = num_acc + len(acc_ids)
|
||||
resources.update(
|
||||
{"accelerators": [self.config['accelerators'][node][id] for id in acc_ids]})
|
||||
|
||||
data['resources'].append(resources)
|
||||
|
||||
# if the number of accelerators has changed in the meantime, upate this field
|
||||
data.update({"numAcc": num_acc})
|
||||
|
||||
if self.debug:
|
||||
print(data)
|
||||
|
||||
ccjob = self.ccapi.startJob(data)
|
||||
|
||||
def _ccStopJob(self, job):
|
||||
print("INFO: Stop job %s" % job['GlobalJobId'])
|
||||
jobId = self._jobIdToInt(job['GlobalJobId'])
|
||||
|
||||
# get search for the jobdata stored in CC
|
||||
# ccjob = {}
|
||||
# for j in self.ccData['jobs']:
|
||||
# if j['jobId'] == jobId:
|
||||
# ccjob = j
|
||||
|
||||
# # check if job is still in squeue data
|
||||
# for job in self.slurmJobData['jobs']:
|
||||
# if job['job_id'] == jobId:
|
||||
# jobstate = job['job_state'].lower()
|
||||
# endtime = job['end_time']
|
||||
# if jobstate == 'requeued':
|
||||
# print("Requeued job")
|
||||
# jobstate = 'failed'
|
||||
|
||||
# if int(ccjob['startTime']) >= int(job['end_time']):
|
||||
# print("squeue correction")
|
||||
# # For some reason (needs to get investigated), failed jobs sometimes report
|
||||
# # an earlier end time in squee than the starting time in CC. If this is the
|
||||
# # case, take the starting time from CC and add ten seconds to the starting
|
||||
# # time as new end time. Otherwise CC refuses to end the job.
|
||||
# endtime = int(ccjob['startTime']) + 1
|
||||
|
||||
# else:
|
||||
# jobsAcctData = self._getAccDataForJob(jobid)['jobs']
|
||||
# for j in jobsAcctData:
|
||||
# if len(j['steps']) > 0 and j['steps'][0]['time']['start'] == ccjob['startTime']:
|
||||
# jobAcctData = j
|
||||
# jobstate = jobAcctData['state']['current'].lower()
|
||||
# endtime = jobAcctData['time']['end']
|
||||
|
||||
# if jobstate == "node_fail":
|
||||
# jobstate = "failed"
|
||||
# if jobstate == "requeued":
|
||||
# print("Requeued job")
|
||||
# jobstate = "failed"
|
||||
|
||||
# if int(ccjob['startTime']) >= int(jobAcctData['time']['end']):
|
||||
# print("sacct correction")
|
||||
# # For some reason (needs to get investigated), failed jobs sometimes report
|
||||
# # an earlier end time in squee than the starting time in CC. If this is the
|
||||
# # case, take the starting time from CC and add ten seconds to the starting
|
||||
# # time as new end time. Otherwise CC refuses to end the job.
|
||||
# endtime = int(ccjob['startTime']) + 1
|
||||
|
||||
jobstate_map = {4: "canceled", 5: "completed",
|
||||
9: "failed", 10: "stopped", 12: "stopped"}
|
||||
jobstate = jobstate_map[job['TriggerEventTypeNumber']]
|
||||
|
||||
data = {
|
||||
'jobId': jobId,
|
||||
'cluster': self.config['cluster'],
|
||||
'startTime': job['JobCurrentStartDate'],
|
||||
'stopTime': job['ToE']['When'],
|
||||
'jobState': jobstate
|
||||
}
|
||||
|
||||
self.ccapi.stopJob(data)
|
||||
|
||||
def _convertNodelist(self, nodelist):
|
||||
# Use slurm to convert a nodelist with ranges into a comma separated list of unique nodes
|
||||
if re.search(self.config['node_regex'], nodelist):
|
||||
command = "scontrol show hostname %s | paste -d, -s" % nodelist
|
||||
retval = self._exec(command).split(',')
|
||||
return retval
|
||||
else:
|
||||
return []
|
||||
|
||||
def _handleEvent(self, event):
|
||||
# event codes: https://htcondor.readthedocs.io/en/latest/codes-other-values/job-event-log-codes.html
|
||||
if event['EventTypeNumber'] == 28: # JobAdInformationEvent
|
||||
if event['TriggerEventTypeNumber'] == 1: # Execute
|
||||
self._ccStartJob(event)
|
||||
elif event['TriggerEventTypeNumber'] == 4 or event['TriggerEventTypeNumber'] == 5 or \
|
||||
event['TriggerEventTypeNumber'] == 9:
|
||||
self._ccStopJob(event)
|
||||
|
||||
def sync(self, limit=200, jobid=None, direction='both'):
|
||||
if self.debug:
|
||||
print("DEBUG: sync called")
|
||||
print("DEBUG: jobid %s" % jobid)
|
||||
|
||||
self._readCCData()
|
||||
|
||||
with tailf.Tail(self.config['htcondor']['eventlog']) as tail:
|
||||
remaining = ""
|
||||
while True:
|
||||
for event in tail:
|
||||
if isinstance(event, bytes):
|
||||
eventlog = remaining + event.decode("utf-8")
|
||||
decoder = json.JSONDecoder()
|
||||
pos = 0
|
||||
while True:
|
||||
try:
|
||||
event, pos = decoder.raw_decode(eventlog, pos)
|
||||
remaining = ""
|
||||
pos += 1
|
||||
self._handleEvent(event)
|
||||
except json.JSONDecodeError:
|
||||
remaining = eventlog[pos:]
|
||||
break
|
||||
elif event is tailf.Truncated:
|
||||
print("File was truncated")
|
||||
else:
|
||||
assert False, "unreachable" # currently. more events may be introduced later
|
||||
time.sleep(5) # save CPU cycles
|
||||
|
||||
with open(self.config['htcondor']['eventlog'], 'r', encoding='utf-8') as f:
|
||||
eventlog = f.read()
|
||||
|
||||
decoder = json.JSONDecoder()
|
||||
pos = 0
|
||||
while True:
|
||||
try:
|
||||
event, pos = decoder.raw_decode(eventlog, pos)
|
||||
pos += 1
|
||||
self._handleEvent(event)
|
||||
except json.JSONDecodeError:
|
||||
break
|
||||
# self._readCondorData()
|
||||
return
|
||||
|
||||
# Abort after a defined count of sync actions. The intend is, to restart this script after the
|
||||
# limit is reached. Otherwise, if many many jobs get stopped, the script might miss some new jobs.
|
||||
sync_count = 0
|
||||
|
||||
# iterate over cc jobs and stop them if they have already ended
|
||||
if direction in ['both', 'stop']:
|
||||
for job in self.ccData['jobs']:
|
||||
if jobid:
|
||||
if int(job['jobId']) == int(jobid) and not self._jobRunning(job['jobId']):
|
||||
self._ccStopJob(job['jobId'])
|
||||
sync_count = sync_count + 1
|
||||
else:
|
||||
if not self._jobRunning(job['jobId']):
|
||||
self._ccStopJob(job['jobId'])
|
||||
sync_count = sync_count + 1
|
||||
if sync_count >= limit:
|
||||
print("INFO: sync limit (%s) reached" % limit)
|
||||
break
|
||||
|
||||
sync_count = 0
|
||||
# iterate over running jobs and add them to cc if they are still missing there
|
||||
if direction in ['both', 'start']:
|
||||
for job in self.slurmJobData['jobs']:
|
||||
# Skip this job if the user does not want the metadata of this job to be submitted to ClusterCockpit
|
||||
# The text field admin_comment is used for this. We assume that this field contains a comma seperated
|
||||
# list of flags.
|
||||
if "disable_cc_submission" in job['admin_comment'].split(','):
|
||||
print(
|
||||
"INFO: Job %s: disable_cc_sumbission is set. Continue with next job" % job['job_id'])
|
||||
continue
|
||||
# consider only running jobs
|
||||
if job['job_state'] == "RUNNING":
|
||||
if jobid:
|
||||
if int(job['job_id']) == int(jobid) and not self._jobIdInCC(job['job_id']):
|
||||
self._ccStartJob(job)
|
||||
sync_count = sync_count + 1
|
||||
else:
|
||||
if not self._jobIdInCC(job['job_id']):
|
||||
self._ccStartJob(job)
|
||||
sync_count = sync_count + 1
|
||||
if sync_count >= limit:
|
||||
print("INFO: sync limit (%s) reached" % limit)
|
||||
break
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
import argparse
|
||||
about = """This script syncs the slurm jobs with the cluster cockpit backend. It uses
|
||||
the slurm command line tools to gather the relevant slurm infos and reads
|
||||
the corresponding info from cluster cockpit via its api.
|
||||
|
||||
After reading the data, it stops all jobs in cluster cockpit which are
|
||||
not running any more according to slurm and afterwards it creates all new
|
||||
running jobs in cluster cockpit.
|
||||
"""
|
||||
parser = argparse.ArgumentParser(description=about)
|
||||
parser.add_argument(
|
||||
"-c", "--config", help="Read config file. Default: config.json", default="config.json")
|
||||
parser.add_argument(
|
||||
"-d", "--debug", help="Enable debug output", action="store_true")
|
||||
parser.add_argument("-j", "--jobid", help="Sync this jobid")
|
||||
parser.add_argument(
|
||||
"-l", "--limit", help="Stop after n sync actions in each direction. Default: 200", default="200", type=int)
|
||||
parser.add_argument("--direction", help="Only sync in this direction",
|
||||
default="both", choices=['both', 'start', 'stop'])
|
||||
args = parser.parse_args()
|
||||
|
||||
# open config file
|
||||
if args.debug:
|
||||
print("DEBUG: load config file: %s" % args.config)
|
||||
with open(args.config, 'r', encoding='utf-8') as f:
|
||||
config = json.load(f)
|
||||
if args.debug:
|
||||
print("DEBUG: config file contents:")
|
||||
print(config)
|
||||
|
||||
s = CondorSync(config, args.debug)
|
||||
s.sync(args.limit, args.jobid, args.direction)
|
47
map-gpu-ids.py
Normal file
47
map-gpu-ids.py
Normal file
@ -0,0 +1,47 @@
|
||||
#!/usr/bin/python3
|
||||
|
||||
from io import StringIO
|
||||
import pandas as pd
|
||||
import json
|
||||
import subprocess
|
||||
|
||||
def fetch_condor_machines():
|
||||
compact_result = subprocess.run(
|
||||
["ssh", "conduit", "condor_status", "-compact"], capture_output=True, text=True)
|
||||
data = pd.read_csv(StringIO(compact_result.stdout),
|
||||
sep='\s+', skipfooter=5, engine="python")
|
||||
return data["Machine"]
|
||||
|
||||
def mapping_for_machine(host):
|
||||
machineAds = subprocess.run(
|
||||
["ssh", "conduit", "condor_status", "-json", host], capture_output=True, text=True)
|
||||
info = json.loads(machineAds.stdout)
|
||||
mapping = {}
|
||||
for slot in info:
|
||||
if 'DetectedGPUs' in slot and not 'ParentSlotId' in slot:
|
||||
detected = [name.strip() for name in slot['DetectedGPUs'].split(',')]
|
||||
for name in detected:
|
||||
snake = name.replace('-', '_').strip()
|
||||
if 'GPUs_' + snake in slot:
|
||||
mapping[name] = slot['GPUs_' + snake]['DevicePciBusId']
|
||||
elif snake + 'DevicePciBusId' in slot:
|
||||
mapping[name] = slot[snake + 'DevicePciBusId']
|
||||
return mapping
|
||||
|
||||
if __name__ == "__main__":
|
||||
import argparse
|
||||
about = """This script reads a map from "AssignedGPUs" names to the PCIe bus ids.
|
||||
"""
|
||||
parser = argparse.ArgumentParser(description=about)
|
||||
parser.add_argument("--host", help="The host to map for.", default="")
|
||||
parser.add_argument("-d", "--debug", help="Enable debug output", action="store_true")
|
||||
args = parser.parse_args()
|
||||
|
||||
if len(args.host) > 0:
|
||||
print(json.dumps({args.host: mapping_for_machine(args.host)}))
|
||||
else:
|
||||
machines = fetch_condor_machines()
|
||||
mappings = {}
|
||||
for machine in machines:
|
||||
mappings[machine] = mapping_for_machine(machine)
|
||||
print(json.dumps(mappings))
|
1
requirements.txt
Normal file
1
requirements.txt
Normal file
@ -0,0 +1 @@
|
||||
tailf==0.2.5
|
@ -1,430 +0,0 @@
|
||||
#!/usr/bin/python3
|
||||
# This script syncs the slurm jobs with the cluster cockpit backend. It uses
|
||||
# the slurm command line tools to gather the relevant slurm infos and reads
|
||||
# the corresponding info from cluster cockpit via its api.
|
||||
#
|
||||
# After reading the data, it stops all jobs in cluster cockpit which are
|
||||
# not running any more according to slurm and afterwards it creates all new
|
||||
# running jobs in cluster cockpit.
|
||||
#
|
||||
# -- Michael Schwarz <schwarz@uni-paderborn.de>
|
||||
|
||||
import subprocess
|
||||
import json
|
||||
import requests
|
||||
import re
|
||||
|
||||
class CCApi:
|
||||
config = {}
|
||||
apiurl = ''
|
||||
apikey = ''
|
||||
headers = {}
|
||||
|
||||
def __init__(self, config, debug=False):
|
||||
self.config = config
|
||||
self.apiurl = "%s/api/" % config['cc-backend']['host']
|
||||
self.apikey = config['cc-backend']['apikey']
|
||||
self.headers = { 'accept': 'application/ld+json',
|
||||
'Content-Type': 'application/json',
|
||||
'Authorization': 'Bearer %s' % self.config['cc-backend']['apikey']}
|
||||
|
||||
def startJob(self, data):
|
||||
url = self.apiurl+"jobs/start_job/"
|
||||
r = requests.post(url, headers=self.headers, json=data)
|
||||
if r.status_code == 201:
|
||||
return r.json()
|
||||
else:
|
||||
print(data)
|
||||
print(r)
|
||||
return False
|
||||
|
||||
def stopJob(self, data):
|
||||
url = self.apiurl+"jobs/stop_job/"
|
||||
r = requests.post(url, headers=self.headers, json=data)
|
||||
if r.status_code == 200:
|
||||
return r.json()
|
||||
else:
|
||||
print(data)
|
||||
print(r)
|
||||
return False
|
||||
|
||||
def getJobs(self, filter_running=True):
|
||||
url = self.apiurl+"jobs/"
|
||||
if filter_running:
|
||||
url = url+"?state=running"
|
||||
r = requests.get(url, headers=self.headers)
|
||||
if r.status_code == 200:
|
||||
return r.json()
|
||||
else:
|
||||
return { 'jobs' : []}
|
||||
|
||||
class SlurmSync:
|
||||
slurmJobData = {}
|
||||
ccData = {}
|
||||
config = {}
|
||||
debug = False
|
||||
ccapi = None
|
||||
|
||||
def __init__(self, config, debug=False):
|
||||
self.config = config
|
||||
self.debug = debug
|
||||
|
||||
# validate config TODO
|
||||
if "slurm" not in config:
|
||||
raise KeyError
|
||||
if "squeue" not in config['slurm']:
|
||||
config.update({'squeue' : '/usr/bin/squeue'})
|
||||
if "sacct" not in config['slurm']:
|
||||
config.update({'sacct' : '/usr/bin/sacct'})
|
||||
if "cc-backend" not in config:
|
||||
raise KeyError
|
||||
if "host" not in config['cc-backend']:
|
||||
raise KeyError
|
||||
if "apikey" not in config['cc-backend']:
|
||||
raise KeyError
|
||||
|
||||
self.ccapi = CCApi(self.config, debug)
|
||||
|
||||
def _exec(self, command):
|
||||
process = subprocess.Popen(command, stdout=subprocess.PIPE,stderr=subprocess.PIPE,shell=True)
|
||||
output, error = process.communicate()
|
||||
if process.returncode is 0:
|
||||
return output.decode('utf-8')
|
||||
else:
|
||||
print("Error: ",error)
|
||||
return ""
|
||||
|
||||
def _readSlurmData(self):
|
||||
if self.debug:
|
||||
print("DEBUG: _readSlurmData called")
|
||||
command = "%s --json" % self.config['slurm']['squeue']
|
||||
self.slurmJobData = json.loads(self._exec(command))
|
||||
|
||||
def _readCCData(self):
|
||||
if self.debug:
|
||||
print("DEBUG: _readCCBackendData called")
|
||||
self.ccData = self.ccapi.getJobs()
|
||||
|
||||
def _getAccDataForJob(self, jobid):
|
||||
command = "%s -j %s --json" % (self.config['slurm']['sacct'], jobid)
|
||||
return json.loads(self._exec(command))
|
||||
|
||||
def _jobIdInCC(self, jobid):
|
||||
for job in self.ccData['jobs']:
|
||||
if jobid == job['jobId']:
|
||||
return True
|
||||
return False
|
||||
|
||||
def _jobRunning(self, jobid):
|
||||
for job in self.slurmJobData['jobs']:
|
||||
if int(job['job_id']) == int(jobid):
|
||||
if job['job_state'] == 'RUNNING':
|
||||
return True
|
||||
return False
|
||||
|
||||
def _getACCIDsFromGRES(self, gres, nodename):
|
||||
ids = self.config['accelerators']
|
||||
|
||||
nodetype = None
|
||||
for k, v in ids.items():
|
||||
if nodename.startswith(k):
|
||||
nodetype = k
|
||||
|
||||
if not nodetype:
|
||||
print("WARNING: Can't find accelerator definition for node %s" % nodename.strip())
|
||||
return []
|
||||
|
||||
# the gres definition might be different on other clusters!
|
||||
m = re.match(r"(fpga|gpu):(\w+):(\d)\(IDX:([\d,\-]+)\)", gres)
|
||||
if m:
|
||||
family = m.group(1)
|
||||
type = m.group(2)
|
||||
amount = m.group(3)
|
||||
indexes = m.group(4)
|
||||
acc_id_list = []
|
||||
|
||||
# IDX might be: IDX:0,2-3
|
||||
# first split at , then expand the range to individual items
|
||||
if len(indexes) > 1:
|
||||
idx_list = indexes.split(',')
|
||||
idx = []
|
||||
for i in idx_list:
|
||||
if len(i) == 1:
|
||||
idx.append(i)
|
||||
else:
|
||||
start = i.split('-')[0]
|
||||
end = i.split('-')[1]
|
||||
idx = idx + list(range(int(start), int(end)+1))
|
||||
indexes = idx
|
||||
|
||||
for i in indexes:
|
||||
acc_id_list.append(ids[nodetype][str(i)])
|
||||
|
||||
print(acc_id_list)
|
||||
return acc_id_list
|
||||
|
||||
return []
|
||||
|
||||
def _ccStartJob(self, job):
|
||||
print("INFO: Crate job %s, user %s, name %s" % (job['job_id'], job['user_name'], job['name']))
|
||||
nodelist = self._convertNodelist(job['job_resources']['nodes'])
|
||||
|
||||
# Exclusive job?
|
||||
if job['shared'] == "none":
|
||||
exclusive = 1
|
||||
# exclusive to user
|
||||
elif job['shared'] == "user":
|
||||
exclusive = 2
|
||||
# exclusive to mcs
|
||||
elif job['shared'] == "mcs":
|
||||
exclusive = 3
|
||||
# default is shared node
|
||||
else:
|
||||
exclusive = 0
|
||||
|
||||
# read job script and environment
|
||||
hashdir = "hash.%s" % str(job['job_id'])[-1]
|
||||
jobscript_filename = "%s/%s/job.%s/script" % (self.config['slurm']['state_save_location'], hashdir, job['job_id'])
|
||||
jobscript = ''
|
||||
try:
|
||||
with open(jobscript_filename, 'r', encoding='utf-8') as f:
|
||||
jobscript = f.read()
|
||||
except FileNotFoundError:
|
||||
jobscript = 'NO JOBSCRIPT'
|
||||
|
||||
environment = ''
|
||||
# FIXME sometimes produces utf-8 conversion errors
|
||||
environment_filename = "%s/%s/job.%s/environment" % (self.config['slurm']['state_save_location'], hashdir, job['job_id'])
|
||||
try:
|
||||
with open(environment_filename, 'r', encoding='utf-8') as f:
|
||||
environment = f.read()
|
||||
except FileNotFoundError:
|
||||
environment = 'NO ENV'
|
||||
except UnicodeDecodeError:
|
||||
environment = 'UNICODE_DECODE_ERROR'
|
||||
|
||||
# truncate environment to 50.000 chars. Otherwise it might not fit into the
|
||||
# Database field together with the job script etc.
|
||||
environment = environment[:50000]
|
||||
|
||||
|
||||
# get additional info from slurm and add environment
|
||||
command = "scontrol show job %s" % job['job_id']
|
||||
slurminfo = self._exec(command)
|
||||
slurminfo = slurminfo + "ENV:\n====\n" + environment
|
||||
|
||||
# build payload
|
||||
data = {'jobId' : job['job_id'],
|
||||
'user' : job['user_name'],
|
||||
'cluster' : job['cluster'],
|
||||
'numNodes' : job['node_count'],
|
||||
'numHwthreads' : job['cpus'],
|
||||
'startTime': job['start_time'],
|
||||
'walltime': int(job['time_limit']) * 60,
|
||||
'project': job['account'],
|
||||
'partition': job['partition'],
|
||||
'exclusive': exclusive,
|
||||
'resources': [],
|
||||
'metadata': {
|
||||
'jobName' : job['name'],
|
||||
'jobScript' : jobscript,
|
||||
'slurmInfo' : slurminfo
|
||||
}
|
||||
}
|
||||
|
||||
# is this part of an array job?
|
||||
if job['array_job_id'] > 0:
|
||||
data.update({"arrayJobId" : job['array_job_id']})
|
||||
|
||||
i = 0
|
||||
num_acc = 0
|
||||
for node in nodelist:
|
||||
# begin dict
|
||||
resources = {'hostname' : node.strip()}
|
||||
|
||||
# if a job uses a node exclusive, there are some assigned cpus (used by this job)
|
||||
# and some unassigned cpus. In this case, the assigned_cpus are those which have
|
||||
# to be monitored, otherwise use the unassigned cpus.
|
||||
hwthreads = job['job_resources']['allocated_nodes'][str(i)]['cores']
|
||||
cores_assigned = []
|
||||
cores_unassigned = []
|
||||
for k,v in hwthreads.items():
|
||||
if v == "assigned":
|
||||
cores_assigned.append(int(k))
|
||||
else:
|
||||
cores_unassigned.append(int(k))
|
||||
|
||||
if len(cores_assigned) > 0:
|
||||
cores = cores_assigned
|
||||
else:
|
||||
cores = cores_unassigned
|
||||
resources.update({"hwthreads": cores})
|
||||
|
||||
# Get allocated GPUs if some are requested
|
||||
if len(job['gres_detail']) > 0:
|
||||
|
||||
gres = job['gres_detail'][i]
|
||||
acc_ids = self._getACCIDsFromGRES(gres, node)
|
||||
if len(acc_ids) > 0:
|
||||
num_acc = num_acc + len(acc_ids)
|
||||
resources.update({"accelerators" : acc_ids})
|
||||
|
||||
|
||||
data['resources'].append(resources)
|
||||
i = i + 1
|
||||
|
||||
# if the number of accelerators has changed in the meantime, upate this field
|
||||
data.update({"numAcc" : num_acc})
|
||||
|
||||
if self.debug:
|
||||
print(data)
|
||||
|
||||
self.ccapi.startJob(data)
|
||||
|
||||
def _ccStopJob(self, jobid):
|
||||
print("INFO: Stop job %s" % jobid)
|
||||
|
||||
# get search for the jobdata stored in CC
|
||||
ccjob = {}
|
||||
for j in self.ccData['jobs']:
|
||||
if j['jobId'] == jobid:
|
||||
ccjob = j
|
||||
|
||||
# check if job is still in squeue data
|
||||
for job in self.slurmJobData['jobs']:
|
||||
if job['job_id'] == jobid:
|
||||
jobstate = job['job_state'].lower()
|
||||
endtime = job['end_time']
|
||||
if jobstate == 'requeued':
|
||||
print("Requeued job")
|
||||
jobstate = 'failed'
|
||||
|
||||
if int(ccjob['startTime']) >= int(job['end_time']):
|
||||
print("squeue correction")
|
||||
# For some reason (needs to get investigated), failed jobs sometimes report
|
||||
# an earlier end time in squee than the starting time in CC. If this is the
|
||||
# case, take the starting time from CC and add ten seconds to the starting
|
||||
# time as new end time. Otherwise CC refuses to end the job.
|
||||
endtime = int(ccjob['startTime']) + 1
|
||||
|
||||
else:
|
||||
jobsAcctData = self._getAccDataForJob(jobid)['jobs']
|
||||
for j in jobsAcctData:
|
||||
if len(j['steps']) > 0 and j['steps'][0]['time']['start'] == ccjob['startTime']:
|
||||
jobAcctData = j
|
||||
jobstate = jobAcctData['state']['current'].lower()
|
||||
endtime = jobAcctData['time']['end']
|
||||
|
||||
if jobstate == "node_fail":
|
||||
jobstate = "failed"
|
||||
if jobstate == "requeued":
|
||||
print("Requeued job")
|
||||
jobstate = "failed"
|
||||
|
||||
if int(ccjob['startTime']) >= int(jobAcctData['time']['end']):
|
||||
print("sacct correction")
|
||||
# For some reason (needs to get investigated), failed jobs sometimes report
|
||||
# an earlier end time in squee than the starting time in CC. If this is the
|
||||
# case, take the starting time from CC and add ten seconds to the starting
|
||||
# time as new end time. Otherwise CC refuses to end the job.
|
||||
endtime = int(ccjob['startTime']) + 1
|
||||
|
||||
data = {
|
||||
'jobId' : jobid,
|
||||
'cluster' : ccjob['cluster'],
|
||||
'startTime' : ccjob['startTime'],
|
||||
'stopTime' : endtime,
|
||||
'jobState' : jobstate
|
||||
}
|
||||
|
||||
self.ccapi.stopJob(data)
|
||||
|
||||
def _convertNodelist(self, nodelist):
|
||||
# Use slurm to convert a nodelist with ranges into a comma separated list of unique nodes
|
||||
if re.search(self.config['node_regex'], nodelist):
|
||||
command = "scontrol show hostname %s | paste -d, -s" % nodelist
|
||||
retval = self._exec(command).split(',')
|
||||
return retval
|
||||
else:
|
||||
return []
|
||||
|
||||
|
||||
def sync(self, limit=200, jobid=None, direction='both'):
|
||||
if self.debug:
|
||||
print("DEBUG: sync called")
|
||||
print("DEBUG: jobid %s" % jobid)
|
||||
self._readSlurmData()
|
||||
self._readCCData()
|
||||
|
||||
# Abort after a defined count of sync actions. The intend is, to restart this script after the
|
||||
# limit is reached. Otherwise, if many many jobs get stopped, the script might miss some new jobs.
|
||||
sync_count = 0
|
||||
|
||||
# iterate over cc jobs and stop them if they have already ended
|
||||
if direction in ['both', 'stop']:
|
||||
for job in self.ccData['jobs']:
|
||||
if jobid:
|
||||
if int(job['jobId']) == int(jobid) and not self._jobRunning(job['jobId']):
|
||||
self._ccStopJob(job['jobId'])
|
||||
sync_count = sync_count + 1
|
||||
else:
|
||||
if not self._jobRunning(job['jobId']):
|
||||
self._ccStopJob(job['jobId'])
|
||||
sync_count = sync_count + 1
|
||||
if sync_count >= limit:
|
||||
print("INFO: sync limit (%s) reached" % limit)
|
||||
break
|
||||
|
||||
sync_count = 0
|
||||
# iterate over running jobs and add them to cc if they are still missing there
|
||||
if direction in ['both', 'start']:
|
||||
for job in self.slurmJobData['jobs']:
|
||||
# Skip this job if the user does not want the metadata of this job to be submitted to ClusterCockpit
|
||||
# The text field admin_comment is used for this. We assume that this field contains a comma seperated
|
||||
# list of flags.
|
||||
if "disable_cc_submission" in job['admin_comment'].split(','):
|
||||
print("INFO: Job %s: disable_cc_sumbission is set. Continue with next job" % job['job_id'])
|
||||
continue
|
||||
# consider only running jobs
|
||||
if job['job_state'] == "RUNNING":
|
||||
if jobid:
|
||||
if int(job['job_id']) == int(jobid) and not self._jobIdInCC(job['job_id']):
|
||||
self._ccStartJob(job)
|
||||
sync_count = sync_count + 1
|
||||
else:
|
||||
if not self._jobIdInCC(job['job_id']):
|
||||
self._ccStartJob(job)
|
||||
sync_count = sync_count + 1
|
||||
if sync_count >= limit:
|
||||
print("INFO: sync limit (%s) reached" % limit)
|
||||
break
|
||||
|
||||
if __name__ == "__main__":
|
||||
import argparse
|
||||
about = """This script syncs the slurm jobs with the cluster cockpit backend. It uses
|
||||
the slurm command line tools to gather the relevant slurm infos and reads
|
||||
the corresponding info from cluster cockpit via its api.
|
||||
|
||||
After reading the data, it stops all jobs in cluster cockpit which are
|
||||
not running any more according to slurm and afterwards it creates all new
|
||||
running jobs in cluster cockpit.
|
||||
"""
|
||||
parser = argparse.ArgumentParser(description=about)
|
||||
parser.add_argument("-c", "--config", help="Read config file. Default: config.json", default="config.json")
|
||||
parser.add_argument("-d", "--debug", help="Enable debug output", action="store_true")
|
||||
parser.add_argument("-j", "--jobid", help="Sync this jobid")
|
||||
parser.add_argument("-l", "--limit", help="Stop after n sync actions in each direction. Default: 200", default="200", type=int)
|
||||
parser.add_argument("--direction", help="Only sync in this direction", default="both", choices=['both', 'start', 'stop'])
|
||||
args = parser.parse_args()
|
||||
|
||||
# open config file
|
||||
if args.debug:
|
||||
print("DEBUG: load config file: %s" % args.config)
|
||||
with open(args.config, 'r', encoding='utf-8') as f:
|
||||
config = json.load(f)
|
||||
if args.debug:
|
||||
print("DEBUG: config file contents:")
|
||||
print(config)
|
||||
|
||||
s = SlurmSync(config, args.debug)
|
||||
s.sync(args.limit, args.jobid, args.direction)
|
Loading…
Reference in New Issue
Block a user