Initial commit

This commit is contained in:
Michael Schwarz 2022-08-25 15:38:06 +02:00
parent 84d49f2807
commit 54fbc4fa93
5 changed files with 594 additions and 129 deletions

131
.gitignore vendored
View File

@ -1,129 +1,2 @@
# Byte-compiled / optimized / DLL files .vscode
__pycache__/ config.json
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
pip-wheel-metadata/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
.python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# PEP 582; used by e.g. github.com/David-OConnor/pyflow
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/

125
Readme.md Normal file
View File

@ -0,0 +1,125 @@
# Introduction
This script syncs the slurm jobs with the
[cluster cockpit](https://github.com/ClusterCockpit/) backend. It uses the
slurm command line tools to gather the relevant slurm infos and reads the
corresponding info from cluster cockpit via its api. After reading the data,
it stops all jobs in cluster cockpit which are not running any more according
to slurm and afterwards it creates all new running jobs in cluster cockpit.
The script has to run on the slurm controller node and needs permissions
to run the slurm commands squeue and sacct and read data from the slurm
state save location.
# Requirements
This script expects a certain data structure in the output of squeue. We have
noticed during development that squeue --json does not distinguish between
individual CPUs in the resources used and in the output the allocation of CPU 1
and 2 is considered to be the same. However, this may be different for shared
nodes if multiple jobs request a certain set of resources.
The cause can be found in the code of the API interface and is based on the
fact that the core ID is taken modulo the number of cores on a socket.
The included patch corrects this behavior. It is necessary to recompile slurm
for this. The patch is for openapi 0.0.37 but should work with other versions
as well.
# Getting started
The easiest way is to clone the Git repository. This way you always get the latest updates.
git clone https://github.com/pc2/cc-slurm-sync
cd cc-slurm-sync
## Configuration file
Before you start, you have to create a configuration file. You can use
`config.json.example` as a starting point. Simply copy or rename it to
`config.json`.
### Confiuration options
**slurm**
* `squeue` Path to the squeue binary. Defaults to `/usr/bin/squeue`
* `sacct` Path to the sacct binary. Defaults to `/usr/bin/sacct`
* `state_save_location` Statesave location of slurm. This option has no default value and is **mandatory**.
**cc-backend**
* `host` The url of the cc-backend api. Must be a valid url excluding trailing `/api`. This option is **mandatory**.
* `apikey` The JWT token to authenticate against cc-backend. This option is **mandatory**.
**accelerators**
This part describes accelerators which might be used in jobs. The format is as follows:
"accelerators" : {
"n2gpu" : {
"0": "00000000:03:00.0",
"1": "00000000:44:00.0",
"2": "00000000:84:00.0",
"3": "00000000:C4:00.0"
},
"n2dgx" : {
"0": "00000000:07:00.0",
"1": "00000000:0F:00.0",
"2": "00000000:47:00.0",
"3": "00000000:4E:00.0",
"4": "00000000:87:00.0",
"5": "00000000:90:00.0",
"6": "00000000:B7:00.0",
"7": "00000000:BD:00.0"
}
},
The first level (`n2gpu`) describes the prefix of the host names in which corresponding accelerators are installed. The second level describes the ID in Slurm followed by the device id.
How to get this data? It depends on the accelerators. The following example is for a host with four NVidia A100 GPUs. This should be similar on all hosts with NVidia GPUs:
# nvidia-smi
Thu Aug 25 14:50:05 2022
+-----------------------------------------------------------------------------+
| NVIDIA-SMI 510.47.03 Driver Version: 510.47.03 CUDA Version: 11.6 |
|-------------------------------+----------------------+----------------------+
| GPU Name Persistence-M| Bus-Id Disp.A | Volatile Uncorr. ECC |
| Fan Temp Perf Pwr:Usage/Cap| Memory-Usage | GPU-Util Compute M. |
| | | MIG M. |
|===============================+======================+======================|
| 0 NVIDIA A100-SXM... On | 00000000:03:00.0 Off | 0 |
| N/A 58C P0 267W / 400W | 7040MiB / 40960MiB | 88% Default |
| | | Disabled |
+-------------------------------+----------------------+----------------------+
| 1 NVIDIA A100-SXM... On | 00000000:44:00.0 Off | 0 |
| N/A 59C P0 337W / 400W | 7040MiB / 40960MiB | 96% Default |
| | | Disabled |
+-------------------------------+----------------------+----------------------+
| 2 NVIDIA A100-SXM... On | 00000000:84:00.0 Off | 0 |
| N/A 57C P0 266W / 400W | 7358MiB / 40960MiB | 89% Default |
| | | Disabled |
+-------------------------------+----------------------+----------------------+
| 3 NVIDIA A100-SXM... On | 00000000:C4:00.0 Off | 0 |
| N/A 56C P0 271W / 400W | 7358MiB / 40960MiB | 89% Default |
| | | Disabled |
+-------------------------------+----------------------+----------------------+
You will find the four GPUs identified by ids starting at 0. In the second coloum, you can find the Bus-ID or identifier of the GPU. These are the values which have to be defined in the code example above. The mechanism in the background assumes that all nodes starting with this prefix have the same configuration and assignment of ID to bus ID. So if you have another configuration, you have to start a new prefix, only matching the hosts with this configuration.
**node_regex**
This option is unique to every cluster system. This regex describes the sytax of the hostnames which are used as computing resources in jobs. \ have to be escaped
Example: `^(n2(lcn|cn|fpga|gpu)[\\d{2,4}\\,\\-\\[\\]]+)+$`
## Running the script
Simply run `slurm-clusercockpit-sync.py` inside the same directory which contains the config.json file. A brief help is also available:
* `-c, --config` You can use a different config file for testing or other purposes. Otherwise it would use config.json in the actual directory.
* `-j, --jobid` In a test setup it might be useful to sync individual job ids instead of syncing all jobs.
* `-l, --limit` Synchronize only this number of jobs in the respective direction. Stopping a job might take some short time. If a massive amount of jobs have to get stopped, the script might run a long time and miss new starting jobs if they start end end within the execution time of the script.
* `--direction` Mostly a debug option. Only synchronize starting or stopping jobs. The default is both directions.
The script terminates after synchronization of all jobs.
# Getting help
This script is to be seen as an example implementation and may have to be adapted for other installations. I tried to keep the script as general as possible and to catch some differences between clusters already. If adjustments are necessary, I am happy about pull requests or notification about that on other ways to get an implementation that runs on as many systems as possible without adjustments in the long run.

30
config.json.example Normal file
View File

@ -0,0 +1,30 @@
{
"slurm" : {
"squeue": "/usr/bin/squeue",
"sacct": "/usr/bin/sacct",
"state_save_location" : "/var/spool/SLURM/StateSaveLocation"
},
"cc-backend" : {
"host" : "https://some.cc.instance",
"apikey" : "<jwt token>"
},
"accelerators" : {
"n2gpu" : {
"0": "00000000:03:00.0",
"1": "00000000:44:00.0",
"2": "00000000:84:00.0",
"3": "00000000:C4:00.0"
},
"n2dgx" : {
"0": "00000000:07:00.0",
"1": "00000000:0F:00.0",
"2": "00000000:47:00.0",
"3": "00000000:4E:00.0",
"4": "00000000:87:00.0",
"5": "00000000:90:00.0",
"6": "00000000:B7:00.0",
"7": "00000000:BD:00.0"
}
},
"node_regex" : "^(n2(lcn|cn|fpga|gpu)[\\d{2,4}\\,\\-\\[\\]]+)+$"
}

13
openapi_0.0.37.patch Normal file
View File

@ -0,0 +1,13 @@
--- slurm-21.08.6_bak/src/plugins/openapi/v0.0.37/jobs.c 2022-02-24 20:10:29.000000000 +0100
+++ slurm-21.08.6/src/plugins/openapi/v0.0.37/jobs.c 2022-07-14 15:02:59.492972019 +0200
@@ -773,9 +773,7 @@
(i /
j->cores_per_socket[sock_inx]));
data_t *core = data_key_set_int(
- cores,
- (i %
- j->cores_per_socket[sock_inx]));
+ cores, i);
if (bit_test(j->core_bitmap_used,
bit_inx)) {

424
slurm-clusercockpit-sync.py Executable file
View File

@ -0,0 +1,424 @@
#!/usr/bin/python3
# This script syncs the slurm jobs with the cluster cockpit backend. It uses
# the slurm command line tools to gather the relevant slurm infos and reads
# the corresponding info from cluster cockpit via its api.
#
# After reading the data, it stops all jobs in cluster cockpit which are
# not running any more according to slurm and afterwards it creates all new
# running jobs in cluster cockpit.
#
# -- Michael Schwarz <schwarz@uni-paderborn.de>
import subprocess
import json
import requests
import re
class CCApi:
config = {}
apiurl = ''
apikey = ''
headers = {}
def __init__(self, config, debug=False):
self.config = config
self.apiurl = "%s/api/" % config['cc-backend']['host']
self.apikey = config['cc-backend']['apikey']
self.headers = { 'accept': 'application/ld+json',
'Content-Type': 'application/json',
'Authorization': 'Bearer %s' % self.config['cc-backend']['apikey']}
def startJob(self, data):
url = self.apiurl+"jobs/start_job/"
r = requests.post(url, headers=self.headers, json=data)
if r.status_code == 201:
return r.json()
else:
print(data)
print(r)
return False
def stopJob(self, data):
url = self.apiurl+"jobs/stop_job/"
r = requests.post(url, headers=self.headers, json=data)
if r.status_code == 200:
return r.json()
else:
print(data)
print(r)
return False
def getJobs(self, filter_running=True):
url = self.apiurl+"jobs/"
if filter_running:
url = url+"?state=running"
r = requests.get(url, headers=self.headers)
if r.status_code == 200:
return r.json()
else:
return { 'jobs' : []}
class SlurmSync:
slurmJobData = {}
ccData = {}
config = {}
debug = False
ccapi = None
def __init__(self, config, debug=False):
self.config = config
self.debug = debug
# validate config TODO
if "slurm" not in config:
raise KeyError
if "squeue" not in config['slurm']:
config.update({'squeue' : '/usr/bin/squeue'})
if "sacct" not in config['slurm']:
config.update({'sacct' : '/usr/bin/sacct'})
if "cc-backend" not in config:
raise KeyError
if "host" not in config['cc-backend']:
raise KeyError
if "apikey" not in config['cc-backend']:
raise KeyError
self.ccapi = CCApi(self.config, debug)
def _exec(self, command):
process = subprocess.Popen(command, stdout=subprocess.PIPE,stderr=subprocess.PIPE,shell=True)
output, error = process.communicate()
if process.returncode is 0:
return output.decode('utf-8')
else:
print("Error: ",error)
return ""
def _readSlurmData(self):
if self.debug:
print("DEBUG: _readSlurmData called")
command = "%s --json" % self.config['slurm']['squeue']
self.slurmJobData = json.loads(self._exec(command))
def _readCCData(self):
if self.debug:
print("DEBUG: _readCCBackendData called")
self.ccData = self.ccapi.getJobs()
def _getAccDataForJob(self, jobid):
command = "%s -j %s --json" % (self.config['slurm']['sacct'], jobid)
return json.loads(self._exec(command))
def _jobIdInCC(self, jobid):
for job in self.ccData['jobs']:
if jobid == job['jobId']:
return True
return False
def _jobRunning(self, jobid):
for job in self.slurmJobData['jobs']:
if int(job['job_id']) == int(jobid):
if job['job_state'] == 'RUNNING':
return True
return False
def _getACCIDsFromGRES(self, gres, nodename):
ids = self.config['accelerators']
nodetype = None
for k, v in ids.items():
if nodename.startswith(k):
nodetype = k
if not nodetype:
print("WARNING: Can't find accelerator definition for node %s" % nodename.strip())
return []
# the gres definition might be different on other clusters!
m = re.match(r"(fpga|gpu):(\w+):(\d)\(IDX:([\d,\-]+)\)", gres)
if m:
family = m.group(1)
type = m.group(2)
amount = m.group(3)
indexes = m.group(4)
acc_id_list = []
# IDX might be: IDX:0,2-3
# first split at , then expand the range to individual items
if len(indexes) > 1:
idx_list = indexes.split(',')
idx = []
for i in idx_list:
if len(i) == 1:
idx.append(i)
else:
start = i.split('-')[0]
end = i.split('-')[1]
idx = idx + list(range(int(start), int(end)+1))
indexes = idx
for i in indexes:
acc_id_list.append(ids[nodetype][str(i)])
print(acc_id_list)
return acc_id_list
return []
def _ccStartJob(self, job):
print("INFO: Crate job %s, user %s, name %s" % (job['job_id'], job['user_name'], job['name']))
nodelist = self._convertNodelist(job['job_resources']['nodes'])
# Exclusive job?
if job['shared'] == "none":
exclusive = 1
# exclusive to user
elif job['shared'] == "user":
exclusive = 2
# exclusive to mcs
elif job['shared'] == "mcs":
exclusive = 3
# default is shared node
else:
exclusive = 0
# read job script and environment
hashdir = "hash.%s" % str(job['job_id'])[-1]
jobscript_filename = "%s/%s/job.%s/script" % (self.config['slurm']['state_save_location'], hashdir, job['job_id'])
jobscript = ''
try:
with open(jobscript_filename, 'r', encoding='utf-8') as f:
jobscript = f.read()
except FileNotFoundError:
jobscript = 'NO JOBSCRIPT'
environment = ''
# FIXME sometimes produces utf-8 conversion errors
environment_filename = "%s/%s/job.%s/environment" % (self.config['slurm']['state_save_location'], hashdir, job['job_id'])
try:
with open(environment_filename, 'r', encoding='utf-8') as f:
environment = f.read()
except FileNotFoundError:
environment = 'NO ENV'
except UnicodeDecodeError:
environment = 'UNICODE_DECODE_ERROR'
# truncate environment to 50.000 chars. Otherwise it might not fit into the
# Database field together with the job script etc.
environment = environment[:50000]
# get additional info from slurm and add environment
command = "scontrol show job %s" % job['job_id']
slurminfo = self._exec(command)
slurminfo = slurminfo + "ENV:\n====\n" + environment
# build payload
data = {'jobId' : job['job_id'],
'user' : job['user_name'],
'cluster' : job['cluster'],
'numNodes' : job['node_count'],
'numHwthreads' : job['cpus'],
'startTime': job['start_time'],
'walltime': int(job['time_limit']) * 60,
'project': job['account'],
'partition': job['partition'],
'exclusive': exclusive,
'resources': [],
'metadata': {
'jobName' : job['name'],
'jobScript' : jobscript,
'slurmInfo' : slurminfo
}
}
# is this part of an array job?
if job['array_job_id'] > 0:
data.update({"arrayJobId" : job['array_job_id']})
i = 0
num_acc = 0
for node in nodelist:
# begin dict
resources = {'hostname' : node.strip()}
# if a job uses a node exclusive, there are some assigned cpus (used by this job)
# and some unassigned cpus. In this case, the assigned_cpus are those which have
# to be monitored, otherwise use the unassigned cpus.
hwthreads = job['job_resources']['allocated_nodes'][str(i)]['cores']
cores_assigned = []
cores_unassigned = []
for k,v in hwthreads.items():
if v == "assigned":
cores_assigned.append(int(k))
else:
cores_unassigned.append(int(k))
if len(cores_assigned) > 0:
cores = cores_assigned
else:
cores = cores_unassigned
resources.update({"hwthreads": cores})
# Get allocated GPUs if some are requested
if len(job['gres_detail']) > 0:
gres = job['gres_detail'][i]
acc_ids = self._getACCIDsFromGRES(gres, node)
if len(acc_ids) > 0:
num_acc = num_acc + len(acc_ids)
resources.update({"accelerators" : acc_ids})
data['resources'].append(resources)
i = i + 1
# if the number of accelerators has changed in the meantime, upate this field
data.update({"numAcc" : num_acc})
if self.debug:
print(data)
self.ccapi.startJob(data)
def _ccStopJob(self, jobid):
print("INFO: Stop job %s" % jobid)
# get search for the jobdata stored in CC
ccjob = {}
for j in self.ccData['jobs']:
if j['jobId'] == jobid:
ccjob = j
# check if job is still in squeue data
for job in self.slurmJobData['jobs']:
if job['job_id'] == jobid:
jobstate = job['job_state'].lower()
endtime = job['end_time']
if jobstate == 'requeued':
print("Requeued job")
jobstate = 'failed'
if int(ccjob['startTime']) >= int(job['end_time']):
print("squeue correction")
# For some reason (needs to get investigated), failed jobs sometimes report
# an earlier end time in squee than the starting time in CC. If this is the
# case, take the starting time from CC and add ten seconds to the starting
# time as new end time. Otherwise CC refuses to end the job.
endtime = int(ccjob['startTime']) + 1
else:
jobsAcctData = self._getAccDataForJob(jobid)['jobs']
for j in jobsAcctData:
if j['steps'][0]['time']['start'] == ccjob['startTime']:
jobAcctData = j
jobstate = jobAcctData['state']['current'].lower()
endtime = jobAcctData['time']['end']
if jobstate == "node_fail":
jobstate = "failed"
if jobstate == "requeued":
print("Requeued job")
jobstate = "failed"
if int(ccjob['startTime']) >= int(jobAcctData['time']['end']):
print("sacct correction")
# For some reason (needs to get investigated), failed jobs sometimes report
# an earlier end time in squee than the starting time in CC. If this is the
# case, take the starting time from CC and add ten seconds to the starting
# time as new end time. Otherwise CC refuses to end the job.
endtime = int(ccjob['startTime']) + 1
data = {
'jobId' : jobid,
'cluster' : ccjob['cluster'],
'startTime' : ccjob['startTime'],
'stopTime' : endtime,
'jobState' : jobstate
}
self.ccapi.stopJob(data)
def _convertNodelist(self, nodelist):
# Use slurm to convert a nodelist with ranges into a comma separated list of unique nodes
if re.search(self.config['node_regex'], nodelist):
command = "scontrol show hostname %s | paste -d, -s" % nodelist
retval = self._exec(command).split(',')
return retval
else:
return []
def sync(self, limit=200, jobid=None, direction='both'):
if self.debug:
print("DEBUG: sync called")
print("DEBUG: jobid %s" % jobid)
self._readSlurmData()
self._readCCData()
# Abort after a defined count of sync actions. The intend is, to restart this script after the
# limit is reached. Otherwise, if many many jobs get stopped, the script might miss some new jobs.
sync_count = 0
# iterate over cc jobs and stop them if they have already ended
if direction in ['both', 'stop']:
for job in self.ccData['jobs']:
if jobid:
if int(job['jobId']) == int(jobid) and not self._jobRunning(job['jobId']):
self._ccStopJob(job['jobId'])
sync_count = sync_count + 1
else:
if not self._jobRunning(job['jobId']):
self._ccStopJob(job['jobId'])
sync_count = sync_count + 1
if sync_count >= limit:
print("INFO: sync limit (%s) reached" % limit)
break
sync_count = 0
# iterate over running jobs and add them to cc if they are still missing there
if direction in ['both', 'start']:
for job in self.slurmJobData['jobs']:
# consider only running jobs
if job['job_state'] == "RUNNING":
if jobid:
if int(job['job_id']) == int(jobid) and not self._jobIdInCC(job['job_id']):
self._ccStartJob(job)
sync_count = sync_count + 1
else:
if not self._jobIdInCC(job['job_id']):
self._ccStartJob(job)
sync_count = sync_count + 1
if sync_count >= limit:
print("INFO: sync limit (%s) reached" % limit)
break
if __name__ == "__main__":
import argparse
about = """This script syncs the slurm jobs with the cluster cockpit backend. It uses
the slurm command line tools to gather the relevant slurm infos and reads
the corresponding info from cluster cockpit via its api.
After reading the data, it stops all jobs in cluster cockpit which are
not running any more according to slurm and afterwards it creates all new
running jobs in cluster cockpit.
"""
parser = argparse.ArgumentParser(description=about)
parser.add_argument("-c", "--config", help="Read config file. Default: config.json", default="config.json")
parser.add_argument("-d", "--debug", help="Enable debug output", action="store_true")
parser.add_argument("-j", "--jobid", help="Sync this jobid")
parser.add_argument("-l", "--limit", help="Stop after n sync actions in each direction. Default: 200", default="200", type=int)
parser.add_argument("--direction", help="Only sync in this direction", default="both", choices=['both', 'start', 'stop'])
args = parser.parse_args()
# open config file
if args.debug:
print("DEBUG: load config file: %s" % args.config)
with open(args.config, 'r', encoding='utf-8') as f:
config = json.load(f)
if args.debug:
print("DEBUG: config file contents:")
print(config)
s = SlurmSync(config, args.debug)
s.sync(args.limit, args.jobid, args.direction)