From 54fbc4fa93e8e3268c471bc04d3768358d6731f7 Mon Sep 17 00:00:00 2001 From: Michael Schwarz Date: Thu, 25 Aug 2022 15:38:06 +0200 Subject: [PATCH] Initial commit --- .gitignore | 131 +---------- Readme.md | 125 +++++++++++ config.json.example | 30 +++ openapi_0.0.37.patch | 13 ++ slurm-clusercockpit-sync.py | 424 ++++++++++++++++++++++++++++++++++++ 5 files changed, 594 insertions(+), 129 deletions(-) create mode 100644 Readme.md create mode 100644 config.json.example create mode 100644 openapi_0.0.37.patch create mode 100755 slurm-clusercockpit-sync.py diff --git a/.gitignore b/.gitignore index b6e4761..91450fa 100644 --- a/.gitignore +++ b/.gitignore @@ -1,129 +1,2 @@ -# Byte-compiled / optimized / DLL files -__pycache__/ -*.py[cod] -*$py.class - -# C extensions -*.so - -# Distribution / packaging -.Python -build/ -develop-eggs/ -dist/ -downloads/ -eggs/ -.eggs/ -lib/ -lib64/ -parts/ -sdist/ -var/ -wheels/ -pip-wheel-metadata/ -share/python-wheels/ -*.egg-info/ -.installed.cfg -*.egg -MANIFEST - -# PyInstaller -# Usually these files are written by a python script from a template -# before PyInstaller builds the exe, so as to inject date/other infos into it. -*.manifest -*.spec - -# Installer logs -pip-log.txt -pip-delete-this-directory.txt - -# Unit test / coverage reports -htmlcov/ -.tox/ -.nox/ -.coverage -.coverage.* -.cache -nosetests.xml -coverage.xml -*.cover -*.py,cover -.hypothesis/ -.pytest_cache/ - -# Translations -*.mo -*.pot - -# Django stuff: -*.log -local_settings.py -db.sqlite3 -db.sqlite3-journal - -# Flask stuff: -instance/ -.webassets-cache - -# Scrapy stuff: -.scrapy - -# Sphinx documentation -docs/_build/ - -# PyBuilder -target/ - -# Jupyter Notebook -.ipynb_checkpoints - -# IPython -profile_default/ -ipython_config.py - -# pyenv -.python-version - -# pipenv -# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. -# However, in case of collaboration, if having platform-specific dependencies or dependencies -# having no cross-platform support, pipenv may install dependencies that don't work, or not -# install all needed dependencies. -#Pipfile.lock - -# PEP 582; used by e.g. github.com/David-OConnor/pyflow -__pypackages__/ - -# Celery stuff -celerybeat-schedule -celerybeat.pid - -# SageMath parsed files -*.sage.py - -# Environments -.env -.venv -env/ -venv/ -ENV/ -env.bak/ -venv.bak/ - -# Spyder project settings -.spyderproject -.spyproject - -# Rope project settings -.ropeproject - -# mkdocs documentation -/site - -# mypy -.mypy_cache/ -.dmypy.json -dmypy.json - -# Pyre type checker -.pyre/ +.vscode +config.json diff --git a/Readme.md b/Readme.md new file mode 100644 index 0000000..5d11e01 --- /dev/null +++ b/Readme.md @@ -0,0 +1,125 @@ +# Introduction + +This script syncs the slurm jobs with the +[cluster cockpit](https://github.com/ClusterCockpit/) backend. It uses the +slurm command line tools to gather the relevant slurm infos and reads the +corresponding info from cluster cockpit via its api. After reading the data, +it stops all jobs in cluster cockpit which are not running any more according +to slurm and afterwards it creates all new running jobs in cluster cockpit. + +The script has to run on the slurm controller node and needs permissions +to run the slurm commands squeue and sacct and read data from the slurm +state save location. + +# Requirements + +This script expects a certain data structure in the output of squeue. We have +noticed during development that squeue --json does not distinguish between +individual CPUs in the resources used and in the output the allocation of CPU 1 +and 2 is considered to be the same. However, this may be different for shared +nodes if multiple jobs request a certain set of resources. + +The cause can be found in the code of the API interface and is based on the +fact that the core ID is taken modulo the number of cores on a socket. + +The included patch corrects this behavior. It is necessary to recompile slurm +for this. The patch is for openapi 0.0.37 but should work with other versions +as well. + +# Getting started + +The easiest way is to clone the Git repository. This way you always get the latest updates. + + git clone https://github.com/pc2/cc-slurm-sync + cd cc-slurm-sync + +## Configuration file +Before you start, you have to create a configuration file. You can use +`config.json.example` as a starting point. Simply copy or rename it to +`config.json`. + +### Confiuration options +**slurm** +* `squeue` Path to the squeue binary. Defaults to `/usr/bin/squeue` +* `sacct` Path to the sacct binary. Defaults to `/usr/bin/sacct` +* `state_save_location` Statesave location of slurm. This option has no default value and is **mandatory**. + +**cc-backend** +* `host` The url of the cc-backend api. Must be a valid url excluding trailing `/api`. This option is **mandatory**. +* `apikey` The JWT token to authenticate against cc-backend. This option is **mandatory**. + +**accelerators** + +This part describes accelerators which might be used in jobs. The format is as follows: + + "accelerators" : { + "n2gpu" : { + "0": "00000000:03:00.0", + "1": "00000000:44:00.0", + "2": "00000000:84:00.0", + "3": "00000000:C4:00.0" + }, + "n2dgx" : { + "0": "00000000:07:00.0", + "1": "00000000:0F:00.0", + "2": "00000000:47:00.0", + "3": "00000000:4E:00.0", + "4": "00000000:87:00.0", + "5": "00000000:90:00.0", + "6": "00000000:B7:00.0", + "7": "00000000:BD:00.0" + } + }, + +The first level (`n2gpu`) describes the prefix of the host names in which corresponding accelerators are installed. The second level describes the ID in Slurm followed by the device id. + +How to get this data? It depends on the accelerators. The following example is for a host with four NVidia A100 GPUs. This should be similar on all hosts with NVidia GPUs: + + # nvidia-smi + Thu Aug 25 14:50:05 2022 + +-----------------------------------------------------------------------------+ + | NVIDIA-SMI 510.47.03 Driver Version: 510.47.03 CUDA Version: 11.6 | + |-------------------------------+----------------------+----------------------+ + | GPU Name Persistence-M| Bus-Id Disp.A | Volatile Uncorr. ECC | + | Fan Temp Perf Pwr:Usage/Cap| Memory-Usage | GPU-Util Compute M. | + | | | MIG M. | + |===============================+======================+======================| + | 0 NVIDIA A100-SXM... On | 00000000:03:00.0 Off | 0 | + | N/A 58C P0 267W / 400W | 7040MiB / 40960MiB | 88% Default | + | | | Disabled | + +-------------------------------+----------------------+----------------------+ + | 1 NVIDIA A100-SXM... On | 00000000:44:00.0 Off | 0 | + | N/A 59C P0 337W / 400W | 7040MiB / 40960MiB | 96% Default | + | | | Disabled | + +-------------------------------+----------------------+----------------------+ + | 2 NVIDIA A100-SXM... On | 00000000:84:00.0 Off | 0 | + | N/A 57C P0 266W / 400W | 7358MiB / 40960MiB | 89% Default | + | | | Disabled | + +-------------------------------+----------------------+----------------------+ + | 3 NVIDIA A100-SXM... On | 00000000:C4:00.0 Off | 0 | + | N/A 56C P0 271W / 400W | 7358MiB / 40960MiB | 89% Default | + | | | Disabled | + +-------------------------------+----------------------+----------------------+ + +You will find the four GPUs identified by ids starting at 0. In the second coloum, you can find the Bus-ID or identifier of the GPU. These are the values which have to be defined in the code example above. The mechanism in the background assumes that all nodes starting with this prefix have the same configuration and assignment of ID to bus ID. So if you have another configuration, you have to start a new prefix, only matching the hosts with this configuration. + +**node_regex** + +This option is unique to every cluster system. This regex describes the sytax of the hostnames which are used as computing resources in jobs. \ have to be escaped + +Example: `^(n2(lcn|cn|fpga|gpu)[\\d{2,4}\\,\\-\\[\\]]+)+$` + +## Running the script + +Simply run `slurm-clusercockpit-sync.py` inside the same directory which contains the config.json file. A brief help is also available: + +* `-c, --config` You can use a different config file for testing or other purposes. Otherwise it would use config.json in the actual directory. +* `-j, --jobid` In a test setup it might be useful to sync individual job ids instead of syncing all jobs. +* `-l, --limit` Synchronize only this number of jobs in the respective direction. Stopping a job might take some short time. If a massive amount of jobs have to get stopped, the script might run a long time and miss new starting jobs if they start end end within the execution time of the script. +* `--direction` Mostly a debug option. Only synchronize starting or stopping jobs. The default is both directions. + +The script terminates after synchronization of all jobs. + +# Getting help + +This script is to be seen as an example implementation and may have to be adapted for other installations. I tried to keep the script as general as possible and to catch some differences between clusters already. If adjustments are necessary, I am happy about pull requests or notification about that on other ways to get an implementation that runs on as many systems as possible without adjustments in the long run. diff --git a/config.json.example b/config.json.example new file mode 100644 index 0000000..239737a --- /dev/null +++ b/config.json.example @@ -0,0 +1,30 @@ +{ + "slurm" : { + "squeue": "/usr/bin/squeue", + "sacct": "/usr/bin/sacct", + "state_save_location" : "/var/spool/SLURM/StateSaveLocation" + }, + "cc-backend" : { + "host" : "https://some.cc.instance", + "apikey" : "" + }, + "accelerators" : { + "n2gpu" : { + "0": "00000000:03:00.0", + "1": "00000000:44:00.0", + "2": "00000000:84:00.0", + "3": "00000000:C4:00.0" + }, + "n2dgx" : { + "0": "00000000:07:00.0", + "1": "00000000:0F:00.0", + "2": "00000000:47:00.0", + "3": "00000000:4E:00.0", + "4": "00000000:87:00.0", + "5": "00000000:90:00.0", + "6": "00000000:B7:00.0", + "7": "00000000:BD:00.0" + } + }, + "node_regex" : "^(n2(lcn|cn|fpga|gpu)[\\d{2,4}\\,\\-\\[\\]]+)+$" +} diff --git a/openapi_0.0.37.patch b/openapi_0.0.37.patch new file mode 100644 index 0000000..c41125d --- /dev/null +++ b/openapi_0.0.37.patch @@ -0,0 +1,13 @@ +--- slurm-21.08.6_bak/src/plugins/openapi/v0.0.37/jobs.c 2022-02-24 20:10:29.000000000 +0100 ++++ slurm-21.08.6/src/plugins/openapi/v0.0.37/jobs.c 2022-07-14 15:02:59.492972019 +0200 +@@ -773,9 +773,7 @@ + (i / + j->cores_per_socket[sock_inx])); + data_t *core = data_key_set_int( +- cores, +- (i % +- j->cores_per_socket[sock_inx])); ++ cores, i); + + if (bit_test(j->core_bitmap_used, + bit_inx)) { diff --git a/slurm-clusercockpit-sync.py b/slurm-clusercockpit-sync.py new file mode 100755 index 0000000..8de963c --- /dev/null +++ b/slurm-clusercockpit-sync.py @@ -0,0 +1,424 @@ +#!/usr/bin/python3 +# This script syncs the slurm jobs with the cluster cockpit backend. It uses +# the slurm command line tools to gather the relevant slurm infos and reads +# the corresponding info from cluster cockpit via its api. +# +# After reading the data, it stops all jobs in cluster cockpit which are +# not running any more according to slurm and afterwards it creates all new +# running jobs in cluster cockpit. +# +# -- Michael Schwarz + +import subprocess +import json +import requests +import re + +class CCApi: + config = {} + apiurl = '' + apikey = '' + headers = {} + + def __init__(self, config, debug=False): + self.config = config + self.apiurl = "%s/api/" % config['cc-backend']['host'] + self.apikey = config['cc-backend']['apikey'] + self.headers = { 'accept': 'application/ld+json', + 'Content-Type': 'application/json', + 'Authorization': 'Bearer %s' % self.config['cc-backend']['apikey']} + + def startJob(self, data): + url = self.apiurl+"jobs/start_job/" + r = requests.post(url, headers=self.headers, json=data) + if r.status_code == 201: + return r.json() + else: + print(data) + print(r) + return False + + def stopJob(self, data): + url = self.apiurl+"jobs/stop_job/" + r = requests.post(url, headers=self.headers, json=data) + if r.status_code == 200: + return r.json() + else: + print(data) + print(r) + return False + + def getJobs(self, filter_running=True): + url = self.apiurl+"jobs/" + if filter_running: + url = url+"?state=running" + r = requests.get(url, headers=self.headers) + if r.status_code == 200: + return r.json() + else: + return { 'jobs' : []} + +class SlurmSync: + slurmJobData = {} + ccData = {} + config = {} + debug = False + ccapi = None + + def __init__(self, config, debug=False): + self.config = config + self.debug = debug + + # validate config TODO + if "slurm" not in config: + raise KeyError + if "squeue" not in config['slurm']: + config.update({'squeue' : '/usr/bin/squeue'}) + if "sacct" not in config['slurm']: + config.update({'sacct' : '/usr/bin/sacct'}) + if "cc-backend" not in config: + raise KeyError + if "host" not in config['cc-backend']: + raise KeyError + if "apikey" not in config['cc-backend']: + raise KeyError + + self.ccapi = CCApi(self.config, debug) + + def _exec(self, command): + process = subprocess.Popen(command, stdout=subprocess.PIPE,stderr=subprocess.PIPE,shell=True) + output, error = process.communicate() + if process.returncode is 0: + return output.decode('utf-8') + else: + print("Error: ",error) + return "" + + def _readSlurmData(self): + if self.debug: + print("DEBUG: _readSlurmData called") + command = "%s --json" % self.config['slurm']['squeue'] + self.slurmJobData = json.loads(self._exec(command)) + + def _readCCData(self): + if self.debug: + print("DEBUG: _readCCBackendData called") + self.ccData = self.ccapi.getJobs() + + def _getAccDataForJob(self, jobid): + command = "%s -j %s --json" % (self.config['slurm']['sacct'], jobid) + return json.loads(self._exec(command)) + + def _jobIdInCC(self, jobid): + for job in self.ccData['jobs']: + if jobid == job['jobId']: + return True + return False + + def _jobRunning(self, jobid): + for job in self.slurmJobData['jobs']: + if int(job['job_id']) == int(jobid): + if job['job_state'] == 'RUNNING': + return True + return False + + def _getACCIDsFromGRES(self, gres, nodename): + ids = self.config['accelerators'] + + nodetype = None + for k, v in ids.items(): + if nodename.startswith(k): + nodetype = k + + if not nodetype: + print("WARNING: Can't find accelerator definition for node %s" % nodename.strip()) + return [] + + # the gres definition might be different on other clusters! + m = re.match(r"(fpga|gpu):(\w+):(\d)\(IDX:([\d,\-]+)\)", gres) + if m: + family = m.group(1) + type = m.group(2) + amount = m.group(3) + indexes = m.group(4) + acc_id_list = [] + + # IDX might be: IDX:0,2-3 + # first split at , then expand the range to individual items + if len(indexes) > 1: + idx_list = indexes.split(',') + idx = [] + for i in idx_list: + if len(i) == 1: + idx.append(i) + else: + start = i.split('-')[0] + end = i.split('-')[1] + idx = idx + list(range(int(start), int(end)+1)) + indexes = idx + + for i in indexes: + acc_id_list.append(ids[nodetype][str(i)]) + + print(acc_id_list) + return acc_id_list + + return [] + + def _ccStartJob(self, job): + print("INFO: Crate job %s, user %s, name %s" % (job['job_id'], job['user_name'], job['name'])) + nodelist = self._convertNodelist(job['job_resources']['nodes']) + + # Exclusive job? + if job['shared'] == "none": + exclusive = 1 + # exclusive to user + elif job['shared'] == "user": + exclusive = 2 + # exclusive to mcs + elif job['shared'] == "mcs": + exclusive = 3 + # default is shared node + else: + exclusive = 0 + + # read job script and environment + hashdir = "hash.%s" % str(job['job_id'])[-1] + jobscript_filename = "%s/%s/job.%s/script" % (self.config['slurm']['state_save_location'], hashdir, job['job_id']) + jobscript = '' + try: + with open(jobscript_filename, 'r', encoding='utf-8') as f: + jobscript = f.read() + except FileNotFoundError: + jobscript = 'NO JOBSCRIPT' + + environment = '' + # FIXME sometimes produces utf-8 conversion errors + environment_filename = "%s/%s/job.%s/environment" % (self.config['slurm']['state_save_location'], hashdir, job['job_id']) + try: + with open(environment_filename, 'r', encoding='utf-8') as f: + environment = f.read() + except FileNotFoundError: + environment = 'NO ENV' + except UnicodeDecodeError: + environment = 'UNICODE_DECODE_ERROR' + + # truncate environment to 50.000 chars. Otherwise it might not fit into the + # Database field together with the job script etc. + environment = environment[:50000] + + + # get additional info from slurm and add environment + command = "scontrol show job %s" % job['job_id'] + slurminfo = self._exec(command) + slurminfo = slurminfo + "ENV:\n====\n" + environment + + # build payload + data = {'jobId' : job['job_id'], + 'user' : job['user_name'], + 'cluster' : job['cluster'], + 'numNodes' : job['node_count'], + 'numHwthreads' : job['cpus'], + 'startTime': job['start_time'], + 'walltime': int(job['time_limit']) * 60, + 'project': job['account'], + 'partition': job['partition'], + 'exclusive': exclusive, + 'resources': [], + 'metadata': { + 'jobName' : job['name'], + 'jobScript' : jobscript, + 'slurmInfo' : slurminfo + } + } + + # is this part of an array job? + if job['array_job_id'] > 0: + data.update({"arrayJobId" : job['array_job_id']}) + + i = 0 + num_acc = 0 + for node in nodelist: + # begin dict + resources = {'hostname' : node.strip()} + + # if a job uses a node exclusive, there are some assigned cpus (used by this job) + # and some unassigned cpus. In this case, the assigned_cpus are those which have + # to be monitored, otherwise use the unassigned cpus. + hwthreads = job['job_resources']['allocated_nodes'][str(i)]['cores'] + cores_assigned = [] + cores_unassigned = [] + for k,v in hwthreads.items(): + if v == "assigned": + cores_assigned.append(int(k)) + else: + cores_unassigned.append(int(k)) + + if len(cores_assigned) > 0: + cores = cores_assigned + else: + cores = cores_unassigned + resources.update({"hwthreads": cores}) + + # Get allocated GPUs if some are requested + if len(job['gres_detail']) > 0: + + gres = job['gres_detail'][i] + acc_ids = self._getACCIDsFromGRES(gres, node) + if len(acc_ids) > 0: + num_acc = num_acc + len(acc_ids) + resources.update({"accelerators" : acc_ids}) + + + data['resources'].append(resources) + i = i + 1 + + # if the number of accelerators has changed in the meantime, upate this field + data.update({"numAcc" : num_acc}) + + if self.debug: + print(data) + + self.ccapi.startJob(data) + + def _ccStopJob(self, jobid): + print("INFO: Stop job %s" % jobid) + + # get search for the jobdata stored in CC + ccjob = {} + for j in self.ccData['jobs']: + if j['jobId'] == jobid: + ccjob = j + + # check if job is still in squeue data + for job in self.slurmJobData['jobs']: + if job['job_id'] == jobid: + jobstate = job['job_state'].lower() + endtime = job['end_time'] + if jobstate == 'requeued': + print("Requeued job") + jobstate = 'failed' + + if int(ccjob['startTime']) >= int(job['end_time']): + print("squeue correction") + # For some reason (needs to get investigated), failed jobs sometimes report + # an earlier end time in squee than the starting time in CC. If this is the + # case, take the starting time from CC and add ten seconds to the starting + # time as new end time. Otherwise CC refuses to end the job. + endtime = int(ccjob['startTime']) + 1 + + else: + jobsAcctData = self._getAccDataForJob(jobid)['jobs'] + for j in jobsAcctData: + if j['steps'][0]['time']['start'] == ccjob['startTime']: + jobAcctData = j + jobstate = jobAcctData['state']['current'].lower() + endtime = jobAcctData['time']['end'] + + if jobstate == "node_fail": + jobstate = "failed" + if jobstate == "requeued": + print("Requeued job") + jobstate = "failed" + + if int(ccjob['startTime']) >= int(jobAcctData['time']['end']): + print("sacct correction") + # For some reason (needs to get investigated), failed jobs sometimes report + # an earlier end time in squee than the starting time in CC. If this is the + # case, take the starting time from CC and add ten seconds to the starting + # time as new end time. Otherwise CC refuses to end the job. + endtime = int(ccjob['startTime']) + 1 + + data = { + 'jobId' : jobid, + 'cluster' : ccjob['cluster'], + 'startTime' : ccjob['startTime'], + 'stopTime' : endtime, + 'jobState' : jobstate + } + + self.ccapi.stopJob(data) + + def _convertNodelist(self, nodelist): + # Use slurm to convert a nodelist with ranges into a comma separated list of unique nodes + if re.search(self.config['node_regex'], nodelist): + command = "scontrol show hostname %s | paste -d, -s" % nodelist + retval = self._exec(command).split(',') + return retval + else: + return [] + + + def sync(self, limit=200, jobid=None, direction='both'): + if self.debug: + print("DEBUG: sync called") + print("DEBUG: jobid %s" % jobid) + self._readSlurmData() + self._readCCData() + + # Abort after a defined count of sync actions. The intend is, to restart this script after the + # limit is reached. Otherwise, if many many jobs get stopped, the script might miss some new jobs. + sync_count = 0 + + # iterate over cc jobs and stop them if they have already ended + if direction in ['both', 'stop']: + for job in self.ccData['jobs']: + if jobid: + if int(job['jobId']) == int(jobid) and not self._jobRunning(job['jobId']): + self._ccStopJob(job['jobId']) + sync_count = sync_count + 1 + else: + if not self._jobRunning(job['jobId']): + self._ccStopJob(job['jobId']) + sync_count = sync_count + 1 + if sync_count >= limit: + print("INFO: sync limit (%s) reached" % limit) + break + + sync_count = 0 + # iterate over running jobs and add them to cc if they are still missing there + if direction in ['both', 'start']: + for job in self.slurmJobData['jobs']: + # consider only running jobs + if job['job_state'] == "RUNNING": + if jobid: + if int(job['job_id']) == int(jobid) and not self._jobIdInCC(job['job_id']): + self._ccStartJob(job) + sync_count = sync_count + 1 + else: + if not self._jobIdInCC(job['job_id']): + self._ccStartJob(job) + sync_count = sync_count + 1 + if sync_count >= limit: + print("INFO: sync limit (%s) reached" % limit) + break + +if __name__ == "__main__": + import argparse + about = """This script syncs the slurm jobs with the cluster cockpit backend. It uses + the slurm command line tools to gather the relevant slurm infos and reads + the corresponding info from cluster cockpit via its api. + + After reading the data, it stops all jobs in cluster cockpit which are + not running any more according to slurm and afterwards it creates all new + running jobs in cluster cockpit. + """ + parser = argparse.ArgumentParser(description=about) + parser.add_argument("-c", "--config", help="Read config file. Default: config.json", default="config.json") + parser.add_argument("-d", "--debug", help="Enable debug output", action="store_true") + parser.add_argument("-j", "--jobid", help="Sync this jobid") + parser.add_argument("-l", "--limit", help="Stop after n sync actions in each direction. Default: 200", default="200", type=int) + parser.add_argument("--direction", help="Only sync in this direction", default="both", choices=['both', 'start', 'stop']) + args = parser.parse_args() + + # open config file + if args.debug: + print("DEBUG: load config file: %s" % args.config) + with open(args.config, 'r', encoding='utf-8') as f: + config = json.load(f) + if args.debug: + print("DEBUG: config file contents:") + print(config) + + s = SlurmSync(config, args.debug) + s.sync(args.limit, args.jobid, args.direction)