mirror of
https://gitlab.cs.uni-saarland.de/hpc/cc-condor-sync.git
synced 2024-12-26 05:29:06 +01:00
Restructure.
This commit is contained in:
parent
a3ca962d84
commit
b530d9034e
@ -22,7 +22,7 @@ endif()
|
||||
add_subdirectory(curlpp)
|
||||
set_property(TARGET curlpp_static PROPERTY POSITION_INDEPENDENT_CODE ON)
|
||||
|
||||
add_library(htcondor_cc_sync_plugin MODULE htcondor_cc_sync_plugin.cpp)
|
||||
add_library(htcondor_cc_sync_plugin MODULE src/htcondor_cc_sync_plugin.cpp)
|
||||
|
||||
target_include_directories(htcondor_cc_sync_plugin PRIVATE
|
||||
${CONDOR_SRC_SRC}
|
||||
|
21
LICENSE
21
LICENSE
@ -1,21 +0,0 @@
|
||||
MIT License
|
||||
|
||||
Copyright (c) 2022 Paderborn Center for Parallel Computing
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
in the Software without restriction, including without limitation the rights
|
||||
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
copies of the Software, and to permit persons to whom the Software is
|
||||
furnished to do so, subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in all
|
||||
copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
SOFTWARE.
|
125
Readme.md
125
Readme.md
@ -1,125 +0,0 @@
|
||||
# Introduction
|
||||
|
||||
This script syncs the slurm jobs with the
|
||||
[cluster cockpit](https://github.com/ClusterCockpit/) backend. It uses the
|
||||
slurm command line tools to gather the relevant slurm infos and reads the
|
||||
corresponding info from cluster cockpit via its api. After reading the data,
|
||||
it stops all jobs in cluster cockpit which are not running any more according
|
||||
to slurm and afterwards it creates all new running jobs in cluster cockpit.
|
||||
|
||||
The script has to run on the slurm controller node and needs permissions
|
||||
to run the slurm commands squeue and sacct and read data from the slurm
|
||||
state save location.
|
||||
|
||||
# Requirements
|
||||
|
||||
This script expects a certain data structure in the output of squeue. We have
|
||||
noticed during development that `squeue --json` does not distinguish between
|
||||
individual CPUs in the resources used and in the output the allocation of CPU 1
|
||||
and 2 is considered to be the same. However, this may be different for shared
|
||||
nodes if multiple jobs request a certain set of resources.
|
||||
|
||||
The cause can be found in the code of the API interface and is based on the
|
||||
fact that the core ID is taken modulo the number of cores on a socket.
|
||||
|
||||
The included patch corrects this behavior. It is necessary to recompile slurm
|
||||
for this. The patch is for openapi 0.0.37 but should work with other versions
|
||||
as well.
|
||||
|
||||
# Getting started
|
||||
|
||||
The easiest way is to clone the Git repository. This way you always get the latest updates.
|
||||
|
||||
git clone https://github.com/pc2/cc-slurm-sync.git
|
||||
cd cc-slurm-sync
|
||||
|
||||
## Configuration file
|
||||
Before you start, you have to create a configuration file. You can use
|
||||
`config.json.example` as a starting point. Simply copy or rename it to
|
||||
`config.json`.
|
||||
|
||||
### Confiuration options
|
||||
**slurm**
|
||||
* `squeue` Path to the squeue binary. Defaults to `/usr/bin/squeue`
|
||||
* `sacct` Path to the sacct binary. Defaults to `/usr/bin/sacct`
|
||||
* `state_save_location` Statesave location of slurm. This option has no default value and is **mandatory**.
|
||||
|
||||
**cc-backend**
|
||||
* `host` The url of the cc-backend api. Must be a valid url excluding trailing `/api`. This option is **mandatory**.
|
||||
* `apikey` The JWT token to authenticate against cc-backend. This option is **mandatory**.
|
||||
|
||||
**accelerators**
|
||||
|
||||
This part describes accelerators which might be used in jobs. The format is as follows:
|
||||
|
||||
"accelerators" : {
|
||||
"n2gpu" : {
|
||||
"0": "00000000:03:00.0",
|
||||
"1": "00000000:44:00.0",
|
||||
"2": "00000000:84:00.0",
|
||||
"3": "00000000:C4:00.0"
|
||||
},
|
||||
"n2dgx" : {
|
||||
"0": "00000000:07:00.0",
|
||||
"1": "00000000:0F:00.0",
|
||||
"2": "00000000:47:00.0",
|
||||
"3": "00000000:4E:00.0",
|
||||
"4": "00000000:87:00.0",
|
||||
"5": "00000000:90:00.0",
|
||||
"6": "00000000:B7:00.0",
|
||||
"7": "00000000:BD:00.0"
|
||||
}
|
||||
},
|
||||
|
||||
The first level (`n2gpu`) describes the prefix of the host names in which corresponding accelerators are installed. The second level describes the ID in Slurm followed by the device id.
|
||||
|
||||
How to get this data? It depends on the accelerators. The following example is for a host with four NVidia A100 GPUs. This should be similar on all hosts with NVidia GPUs:
|
||||
|
||||
# nvidia-smi
|
||||
Thu Aug 25 14:50:05 2022
|
||||
+-----------------------------------------------------------------------------+
|
||||
| NVIDIA-SMI 510.47.03 Driver Version: 510.47.03 CUDA Version: 11.6 |
|
||||
|-------------------------------+----------------------+----------------------+
|
||||
| GPU Name Persistence-M| Bus-Id Disp.A | Volatile Uncorr. ECC |
|
||||
| Fan Temp Perf Pwr:Usage/Cap| Memory-Usage | GPU-Util Compute M. |
|
||||
| | | MIG M. |
|
||||
|===============================+======================+======================|
|
||||
| 0 NVIDIA A100-SXM... On | 00000000:03:00.0 Off | 0 |
|
||||
| N/A 58C P0 267W / 400W | 7040MiB / 40960MiB | 88% Default |
|
||||
| | | Disabled |
|
||||
+-------------------------------+----------------------+----------------------+
|
||||
| 1 NVIDIA A100-SXM... On | 00000000:44:00.0 Off | 0 |
|
||||
| N/A 59C P0 337W / 400W | 7040MiB / 40960MiB | 96% Default |
|
||||
| | | Disabled |
|
||||
+-------------------------------+----------------------+----------------------+
|
||||
| 2 NVIDIA A100-SXM... On | 00000000:84:00.0 Off | 0 |
|
||||
| N/A 57C P0 266W / 400W | 7358MiB / 40960MiB | 89% Default |
|
||||
| | | Disabled |
|
||||
+-------------------------------+----------------------+----------------------+
|
||||
| 3 NVIDIA A100-SXM... On | 00000000:C4:00.0 Off | 0 |
|
||||
| N/A 56C P0 271W / 400W | 7358MiB / 40960MiB | 89% Default |
|
||||
| | | Disabled |
|
||||
+-------------------------------+----------------------+----------------------+
|
||||
|
||||
You will find the four GPUs identified by ids starting at 0. In the second coloum, you can find the Bus-ID or identifier of the GPU. These are the values which have to be defined in the code example above. The mechanism in the background assumes that all nodes starting with this prefix have the same configuration and assignment of ID to bus ID. So if you have another configuration, you have to start a new prefix, only matching the hosts with this configuration.
|
||||
|
||||
**node_regex**
|
||||
|
||||
This option is unique to every cluster system. This regex describes the sytax of the hostnames which are used as computing resources in jobs. \ have to be escaped
|
||||
|
||||
Example: `^(n2(lcn|cn|fpga|gpu)[\\d{2,4}\\,\\-\\[\\]]+)+$`
|
||||
|
||||
## Running the script
|
||||
|
||||
Simply run `slurm-clusercockpit-sync.py` inside the same directory which contains the config.json file. A brief help is also available:
|
||||
|
||||
* `-c, --config` You can use a different config file for testing or other purposes. Otherwise it would use config.json in the actual directory.
|
||||
* `-j, --jobid` In a test setup it might be useful to sync individual job ids instead of syncing all jobs.
|
||||
* `-l, --limit` Synchronize only this number of jobs in the respective direction. Stopping a job might take some short time. If a massive amount of jobs have to get stopped, the script might run a long time and miss new starting jobs if they start end end within the execution time of the script.
|
||||
* `--direction` Mostly a debug option. Only synchronize starting or stopping jobs. The default is both directions.
|
||||
|
||||
The script terminates after synchronization of all jobs.
|
||||
|
||||
# Getting help
|
||||
|
||||
This script is to be seen as an example implementation and may have to be adapted for other installations. I tried to keep the script as general as possible and to catch some differences between clusters already. If adjustments are necessary, I am happy about pull requests or notification about that on other ways to get an implementation that runs on as many systems as possible without adjustments in the long run.
|
@ -1,30 +0,0 @@
|
||||
{
|
||||
"slurm" : {
|
||||
"squeue": "/usr/bin/squeue",
|
||||
"sacct": "/usr/bin/sacct",
|
||||
"state_save_location" : "/var/spool/SLURM/StateSaveLocation"
|
||||
},
|
||||
"cc-backend" : {
|
||||
"host" : "https://some.cc.instance",
|
||||
"apikey" : "<jwt token>"
|
||||
},
|
||||
"accelerators" : {
|
||||
"n2gpu" : {
|
||||
"0": "00000000:03:00.0",
|
||||
"1": "00000000:44:00.0",
|
||||
"2": "00000000:84:00.0",
|
||||
"3": "00000000:C4:00.0"
|
||||
},
|
||||
"n2dgx" : {
|
||||
"0": "00000000:07:00.0",
|
||||
"1": "00000000:0F:00.0",
|
||||
"2": "00000000:47:00.0",
|
||||
"3": "00000000:4E:00.0",
|
||||
"4": "00000000:87:00.0",
|
||||
"5": "00000000:90:00.0",
|
||||
"6": "00000000:B7:00.0",
|
||||
"7": "00000000:BD:00.0"
|
||||
}
|
||||
},
|
||||
"node_regex" : "^(n2(lcn|cn|fpga|gpu)[\\d{2,4}\\,\\-\\[\\]]+)+$"
|
||||
}
|
@ -1,545 +0,0 @@
|
||||
#!/usr/bin/python3
|
||||
# This script syncs the slurm jobs with the cluster cockpit backend. It uses
|
||||
# the slurm command line tools to gather the relevant slurm infos and reads
|
||||
# the corresponding info from cluster cockpit via its api.
|
||||
#
|
||||
# After reading the data, it stops all jobs in cluster cockpit which are
|
||||
# not running any more according to slurm and afterwards it creates all new
|
||||
# running jobs in cluster cockpit.
|
||||
#
|
||||
# -- Michael Schwarz <schwarz@uni-paderborn.de>
|
||||
|
||||
from dateutil import parser as dateparser
|
||||
import platform
|
||||
import subprocess
|
||||
import json
|
||||
import time
|
||||
import requests
|
||||
import re
|
||||
import tailf
|
||||
|
||||
|
||||
class CCApi:
|
||||
config = {}
|
||||
apiurl = ''
|
||||
apikey = ''
|
||||
headers = {}
|
||||
debug = False
|
||||
|
||||
def __init__(self, config, debug=False):
|
||||
self.config = config
|
||||
self.apiurl = "%s/api/" % config['cc-backend']['host']
|
||||
self.apikey = config['cc-backend']['apikey']
|
||||
self.headers = {'accept': 'application/ld+json',
|
||||
'Content-Type': 'application/json',
|
||||
'Authorization': 'Bearer %s' % self.config['cc-backend']['apikey']}
|
||||
self.debug = debug
|
||||
|
||||
def startJob(self, data):
|
||||
url = self.apiurl+"jobs/start_job/"
|
||||
r = requests.post(url, headers=self.headers, json=data)
|
||||
if r.status_code == 201:
|
||||
return r.json()
|
||||
elif r.status_code == 422:
|
||||
if self.debug:
|
||||
print(data)
|
||||
print(r.status_code, r.content)
|
||||
return False
|
||||
else:
|
||||
print(data)
|
||||
print(r)
|
||||
print(r.json())
|
||||
return False
|
||||
|
||||
def stopJob(self, data):
|
||||
url = self.apiurl+"jobs/stop_job/"
|
||||
r = requests.post(url, headers=self.headers, json=data)
|
||||
if r.status_code == 200:
|
||||
return r.json()
|
||||
else:
|
||||
print(data)
|
||||
print(r.status_code, r.content)
|
||||
return False
|
||||
|
||||
def getJobs(self, filter_running=True):
|
||||
url = self.apiurl+"jobs/"
|
||||
if filter_running:
|
||||
url = url+"?state=running"
|
||||
r = requests.get(url, headers=self.headers)
|
||||
if r.status_code == 200:
|
||||
return r.json()
|
||||
else:
|
||||
return {'jobs': []}
|
||||
|
||||
|
||||
class CondorSync:
|
||||
condorJobData = {}
|
||||
ccData = {}
|
||||
config = {}
|
||||
debug = False
|
||||
ccapi = None
|
||||
submit_node = ''
|
||||
|
||||
def __init__(self, config, debug=False):
|
||||
self.config = config
|
||||
self.debug = debug
|
||||
if 'submitnode' in config['htcondor']:
|
||||
self.submit_node = config['htcondor']['submitnode']
|
||||
else:
|
||||
self.submit_node = platform.node()
|
||||
|
||||
# validate config TODO
|
||||
if "htcondor" not in config:
|
||||
raise KeyError
|
||||
if "eventlog" not in config['htcondor']:
|
||||
raise KeyError
|
||||
if "cc-backend" not in config:
|
||||
raise KeyError
|
||||
if "host" not in config['cc-backend']:
|
||||
raise KeyError
|
||||
if "apikey" not in config['cc-backend']:
|
||||
raise KeyError
|
||||
|
||||
self.ccapi = CCApi(self.config, debug)
|
||||
|
||||
def _exec(self, command):
|
||||
process = subprocess.Popen(
|
||||
command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
|
||||
output, error = process.communicate()
|
||||
if process.returncode == 0:
|
||||
return output.decode('utf-8')
|
||||
else:
|
||||
print("Error: ", error)
|
||||
return ""
|
||||
|
||||
def _readCondorData(self):
|
||||
if self.debug:
|
||||
print("DEBUG: _readCondorData called")
|
||||
with open(self.config['htcondor']['eventlog']) as f:
|
||||
self.condorJobData = json.load(f)
|
||||
|
||||
def _readCCData(self):
|
||||
if self.debug:
|
||||
print("DEBUG: _readCCBackendData called")
|
||||
self.ccData = self.ccapi.getJobs()
|
||||
if self.debug:
|
||||
print("DEBUG: ccData:", self.ccData)
|
||||
|
||||
def _getAccDataForJob(self, jobid):
|
||||
raise NotImplementedError
|
||||
command = "%s -j %s --json" % (self.config['slurm']['sacct'], jobid)
|
||||
return json.loads(self._exec(command))
|
||||
|
||||
def _getSubmitNodeId(self, globalJobId):
|
||||
job_id_parts = globalJobId.split('#')
|
||||
return self.config['htcondor']['submitnodes'][job_id_parts[0].split('.')[0]]
|
||||
|
||||
def _jobIdInCC(self, jobid):
|
||||
for job in self.ccData['jobs']:
|
||||
if jobid == job['jobId']:
|
||||
return True
|
||||
return False
|
||||
|
||||
def _jobIdToInt(self, globalJobId):
|
||||
job_id_parts = globalJobId.split('#')
|
||||
cluster_id, proc_id = [int(id) for id in job_id_parts[1].split('.')]
|
||||
return cluster_id << 32 | ((proc_id & 0x3FFFFFFF) << 2) | self._getSubmitNodeId(globalJobId)
|
||||
|
||||
def _jobRunning(self, jobid):
|
||||
for job in self.slurmJobData['jobs']:
|
||||
if int(job['job_id']) == int(jobid):
|
||||
if job['job_state'] == 'RUNNING':
|
||||
return True
|
||||
return False
|
||||
|
||||
def _getACCIDsFromGRES(self, gres, nodename):
|
||||
ids = self.config['accelerators']
|
||||
|
||||
nodetype = None
|
||||
for k, v in ids.items():
|
||||
if nodename.startswith(k):
|
||||
nodetype = k
|
||||
|
||||
if not nodetype:
|
||||
print("WARNING: Can't find accelerator definition for node %s" %
|
||||
nodename.strip())
|
||||
return []
|
||||
|
||||
# the gres definition might be different on other clusters!
|
||||
m = re.match(r"(fpga|gpu):(\w+):(\d)\(IDX:([\d,\-]+)\)", gres)
|
||||
if m:
|
||||
family = m.group(1)
|
||||
type = m.group(2)
|
||||
amount = m.group(3)
|
||||
indexes = m.group(4)
|
||||
acc_id_list = []
|
||||
|
||||
# IDX might be: IDX:0,2-3
|
||||
# first split at , then expand the range to individual items
|
||||
if len(indexes) > 1:
|
||||
idx_list = indexes.split(',')
|
||||
idx = []
|
||||
for i in idx_list:
|
||||
if len(i) == 1:
|
||||
idx.append(i)
|
||||
else:
|
||||
start = i.split('-')[0]
|
||||
end = i.split('-')[1]
|
||||
idx = idx + list(range(int(start), int(end)+1))
|
||||
indexes = idx
|
||||
|
||||
for i in indexes:
|
||||
acc_id_list.append(ids[nodetype][str(i)])
|
||||
|
||||
print(acc_id_list)
|
||||
return acc_id_list
|
||||
|
||||
return []
|
||||
|
||||
def _ccStartJob(self, job):
|
||||
print("INFO: Create job %s, user %s, name %s" %
|
||||
(job['GlobalJobId'], job['Owner'], job['JobBatchName'] if 'JobBatchName' in job else ''))
|
||||
jobId = self._jobIdToInt(job['GlobalJobId'])
|
||||
# for j in self.ccData['jobs']:
|
||||
# if j['jobId'] == jobId:
|
||||
# return
|
||||
|
||||
nodelist = [job['RemoteHost'].split('@')[1]]
|
||||
|
||||
# # Exclusive job?
|
||||
# if job['shared'] == "none":
|
||||
# exclusive = 1
|
||||
# # exclusive to user
|
||||
# elif job['shared'] == "user":
|
||||
# exclusive = 2
|
||||
# # exclusive to mcs
|
||||
# elif job['shared'] == "mcs":
|
||||
# exclusive = 3
|
||||
# # default is shared node
|
||||
# else:
|
||||
exclusive = 0
|
||||
|
||||
# read job script and environment
|
||||
# hashdir = "hash.%s" % str(job['GlobalJobId'])[-1]
|
||||
# jobscript_filename = "%s/%s/job.%s/script" % (
|
||||
# self.config['slurm']['state_save_location'], hashdir, job['GlobalJobId'])
|
||||
# jobscript = ''
|
||||
# try:
|
||||
# with open(jobscript_filename, 'r', encoding='utf-8') as f:
|
||||
# jobscript = f.read()
|
||||
# except FileNotFoundError:
|
||||
# jobscript = 'NO JOBSCRIPT'
|
||||
|
||||
environment = ''
|
||||
# FIXME sometimes produces utf-8 conversion errors
|
||||
# environment_filename = "%s/%s/job.%s/environment" % (
|
||||
# self.config['slurm']['state_save_location'], hashdir, job['GlobalJobId'])
|
||||
# try:
|
||||
# with open(environment_filename, 'r', encoding='utf-8') as f:
|
||||
# environment = f.read()
|
||||
# except FileNotFoundError:
|
||||
# environment = 'NO ENV'
|
||||
# except UnicodeDecodeError:
|
||||
# environment = 'UNICODE_DECODE_ERROR'
|
||||
|
||||
# truncate environment to 50.000 chars. Otherwise it might not fit into the
|
||||
# Database field together with the job script etc.
|
||||
# environment = environment[:50000]
|
||||
|
||||
# # get additional info from slurm and add environment
|
||||
# command = "scontrol show job %s" % job['GlobalJobId']
|
||||
# slurminfo = self._exec(command)
|
||||
# slurminfo = slurminfo + "ENV:\n====\n" + environment
|
||||
|
||||
if job['Subproc'] > 0:
|
||||
print("WARNING: did not expect to see Subproc != 0")
|
||||
|
||||
# build payload
|
||||
data = {'jobId': jobId,
|
||||
'user': job['Owner'],
|
||||
'cluster': self.config['cluster'],
|
||||
'numNodes': job['CurrentHosts'],
|
||||
'numHwthreads': job['CpusProvisioned'],
|
||||
'startTime': job['JobCurrentStartDate'],
|
||||
# 'walltime': int(job['time_limit']) * 60,
|
||||
'project': job['AccountingGroup'],
|
||||
'partition': 'main', # job['partition'],
|
||||
'exclusive': exclusive,
|
||||
'resources': [],
|
||||
'metadata': {
|
||||
'jobName': job['JobBatchName'] if 'JobBatchName' in job else ''
|
||||
}
|
||||
}
|
||||
|
||||
# is this part of an array job?
|
||||
if job['Cluster'] > 0:
|
||||
data.update(
|
||||
{"arrayJobId": job['Cluster'] * 10 + self._getSubmitNodeId(job['GlobalJobId'])})
|
||||
|
||||
num_acc = 0
|
||||
for node in nodelist:
|
||||
# begin dict
|
||||
resources = {'hostname': node.split('.')[0].strip()}
|
||||
|
||||
# if a job uses a node exclusive, there are some assigned cpus (used by this job)
|
||||
# and some unassigned cpus. In this case, the assigned_cpus are those which have
|
||||
# to be monitored, otherwise use the unassigned cpus.
|
||||
# hwthreads = job['job_resources']['allocated_nodes'][str(
|
||||
# i)]['cores']
|
||||
# cores_assigned = []
|
||||
# cores_unassigned = []
|
||||
# for k, v in hwthreads.items():
|
||||
# if v == "assigned":
|
||||
# cores_assigned.append(int(k))
|
||||
# else:
|
||||
# cores_unassigned.append(int(k))
|
||||
|
||||
# if len(cores_assigned) > 0:
|
||||
# cores = cores_assigned
|
||||
# else:
|
||||
# cores = cores_unassigned
|
||||
# resources.update({"hwthreads": cores})
|
||||
|
||||
# Get allocated GPUs if some are requested
|
||||
if 'AssignedGPUs' in job:
|
||||
|
||||
acc_ids = job['AssignedGPUs'].split(',')
|
||||
if len(acc_ids) > 0:
|
||||
num_acc = num_acc + len(acc_ids)
|
||||
resources.update(
|
||||
{"accelerators": [self.config['accelerators'][node][id] for id in acc_ids]})
|
||||
|
||||
data['resources'].append(resources)
|
||||
|
||||
# if the number of accelerators has changed in the meantime, upate this field
|
||||
data.update({"numAcc": num_acc})
|
||||
|
||||
if self.debug:
|
||||
print(data)
|
||||
|
||||
ccjob = self.ccapi.startJob(data)
|
||||
|
||||
def _ccStopJob(self, job):
|
||||
if 'GlobalJobID' in job:
|
||||
globalJobId = job['GlobalJobId']
|
||||
else:
|
||||
globalJobId = "%s#%d.%d#%d" % (
|
||||
self.submit_node, job['Cluster'], job['Proc'], int(time.time()))
|
||||
print("INFO: Stop job %s" % globalJobId)
|
||||
jobId = self._jobIdToInt(globalJobId)
|
||||
|
||||
# get search for the jobdata stored in CC
|
||||
# ccjob = {}
|
||||
# for j in self.ccData['jobs']:
|
||||
# if j['jobId'] == jobId:
|
||||
# ccjob = j
|
||||
|
||||
# # check if job is still in squeue data
|
||||
# for job in self.slurmJobData['jobs']:
|
||||
# if job['job_id'] == jobId:
|
||||
# jobstate = job['job_state'].lower()
|
||||
# endtime = job['end_time']
|
||||
# if jobstate == 'requeued':
|
||||
# print("Requeued job")
|
||||
# jobstate = 'failed'
|
||||
|
||||
# if int(ccjob['startTime']) >= int(job['end_time']):
|
||||
# print("squeue correction")
|
||||
# # For some reason (needs to get investigated), failed jobs sometimes report
|
||||
# # an earlier end time in squee than the starting time in CC. If this is the
|
||||
# # case, take the starting time from CC and add ten seconds to the starting
|
||||
# # time as new end time. Otherwise CC refuses to end the job.
|
||||
# endtime = int(ccjob['startTime']) + 1
|
||||
|
||||
# else:
|
||||
# jobsAcctData = self._getAccDataForJob(jobid)['jobs']
|
||||
# for j in jobsAcctData:
|
||||
# if len(j['steps']) > 0 and j['steps'][0]['time']['start'] == ccjob['startTime']:
|
||||
# jobAcctData = j
|
||||
# jobstate = jobAcctData['state']['current'].lower()
|
||||
# endtime = jobAcctData['time']['end']
|
||||
|
||||
# if jobstate == "node_fail":
|
||||
# jobstate = "failed"
|
||||
# if jobstate == "requeued":
|
||||
# print("Requeued job")
|
||||
# jobstate = "failed"
|
||||
|
||||
# if int(ccjob['startTime']) >= int(jobAcctData['time']['end']):
|
||||
# print("sacct correction")
|
||||
# # For some reason (needs to get investigated), failed jobs sometimes report
|
||||
# # an earlier end time in squee than the starting time in CC. If this is the
|
||||
# # case, take the starting time from CC and add ten seconds to the starting
|
||||
# # time as new end time. Otherwise CC refuses to end the job.
|
||||
# endtime = int(ccjob['startTime']) + 1
|
||||
|
||||
jobstate_map = {4: "cancelled", 5: "completed", 7: "failed", 9: "cancelled",
|
||||
10: "stopped", 12: "stopped", 24: "failed"}
|
||||
jobstate = jobstate_map[job['TriggerEventTypeNumber']]
|
||||
|
||||
data = {
|
||||
'jobId': jobId,
|
||||
'cluster': self.config['cluster'],
|
||||
'jobState': jobstate
|
||||
}
|
||||
if 'ToE' in job:
|
||||
if isinstance(job['ToE']['When'], int):
|
||||
data['stopTime'] = job['ToE']['When']
|
||||
else:
|
||||
data['stopTime'] = int(time.mktime(
|
||||
dateparser.parse(job['ToE']['When']).timetuple()))
|
||||
else:
|
||||
data['stopTime'] = int(time.mktime(
|
||||
dateparser.parse(job['EventTime']).timetuple()))
|
||||
|
||||
if 'JobCurrentStartDate' in job:
|
||||
data['startTime'] = job['JobCurrentStartDate']
|
||||
|
||||
if self.debug:
|
||||
print(data)
|
||||
|
||||
self.ccapi.stopJob(data)
|
||||
|
||||
def _convertNodelist(self, nodelist):
|
||||
# Use slurm to convert a nodelist with ranges into a comma separated list of unique nodes
|
||||
if re.search(self.config['node_regex'], nodelist):
|
||||
command = "scontrol show hostname %s | paste -d, -s" % nodelist
|
||||
retval = self._exec(command).split(',')
|
||||
return retval
|
||||
else:
|
||||
return []
|
||||
|
||||
def _handleEvent(self, event):
|
||||
# event codes: https://htcondor.readthedocs.io/en/latest/codes-other-values/job-event-log-codes.html
|
||||
if event['EventTypeNumber'] == 28: # JobAdInformationEvent
|
||||
if event['TriggerEventTypeNumber'] == 1: # Execute
|
||||
self._ccStartJob(event)
|
||||
elif event['TriggerEventTypeNumber'] == 4 or event['TriggerEventTypeNumber'] == 5 or \
|
||||
event['TriggerEventTypeNumber'] == 7 or event['TriggerEventTypeNumber'] == 9 or \
|
||||
event['TriggerEventTypeNumber'] == 10 or event['TriggerEventTypeNumber'] == 12 or \
|
||||
event['TriggerEventTypeNumber'] == 24:
|
||||
self._ccStopJob(event)
|
||||
|
||||
def sync(self, limit=200, jobid=None, direction='both'):
|
||||
if self.debug:
|
||||
print("DEBUG: sync called")
|
||||
print("DEBUG: jobid %s" % jobid)
|
||||
|
||||
# self._readCCData()
|
||||
|
||||
with tailf.Tail(self.config['htcondor']['eventlog']) as tail:
|
||||
remaining = ""
|
||||
while True:
|
||||
for event in tail:
|
||||
if isinstance(event, bytes):
|
||||
eventlog = remaining + event.decode("utf-8")
|
||||
decoder = json.JSONDecoder()
|
||||
pos = 0
|
||||
while True:
|
||||
try:
|
||||
event, pos = decoder.raw_decode(eventlog, pos)
|
||||
remaining = ""
|
||||
pos += 1
|
||||
self._handleEvent(event)
|
||||
except json.JSONDecodeError:
|
||||
remaining = eventlog[pos:]
|
||||
break
|
||||
elif event is tailf.Truncated:
|
||||
print("File was truncated")
|
||||
else:
|
||||
assert False, "unreachable" # currently. more events may be introduced later
|
||||
time.sleep(5) # save CPU cycles
|
||||
|
||||
with open(self.config['htcondor']['eventlog'], 'r', encoding='utf-8') as f:
|
||||
eventlog = f.read()
|
||||
|
||||
decoder = json.JSONDecoder()
|
||||
pos = 0
|
||||
while True:
|
||||
try:
|
||||
event, pos = decoder.raw_decode(eventlog, pos)
|
||||
pos += 1
|
||||
self._handleEvent(event)
|
||||
except json.JSONDecodeError:
|
||||
break
|
||||
# self._readCondorData()
|
||||
return
|
||||
|
||||
# Abort after a defined count of sync actions. The intend is, to restart this script after the
|
||||
# limit is reached. Otherwise, if many many jobs get stopped, the script might miss some new jobs.
|
||||
sync_count = 0
|
||||
|
||||
# iterate over cc jobs and stop them if they have already ended
|
||||
if direction in ['both', 'stop']:
|
||||
for job in self.ccData['jobs']:
|
||||
if jobid:
|
||||
if int(job['jobId']) == int(jobid) and not self._jobRunning(job['jobId']):
|
||||
self._ccStopJob(job['jobId'])
|
||||
sync_count = sync_count + 1
|
||||
else:
|
||||
if not self._jobRunning(job['jobId']):
|
||||
self._ccStopJob(job['jobId'])
|
||||
sync_count = sync_count + 1
|
||||
if sync_count >= limit:
|
||||
print("INFO: sync limit (%s) reached" % limit)
|
||||
break
|
||||
|
||||
sync_count = 0
|
||||
# iterate over running jobs and add them to cc if they are still missing there
|
||||
if direction in ['both', 'start']:
|
||||
for job in self.slurmJobData['jobs']:
|
||||
# Skip this job if the user does not want the metadata of this job to be submitted to ClusterCockpit
|
||||
# The text field admin_comment is used for this. We assume that this field contains a comma seperated
|
||||
# list of flags.
|
||||
if "disable_cc_submission" in job['admin_comment'].split(','):
|
||||
print(
|
||||
"INFO: Job %s: disable_cc_sumbission is set. Continue with next job" % job['job_id'])
|
||||
continue
|
||||
# consider only running jobs
|
||||
if job['job_state'] == "RUNNING":
|
||||
if jobid:
|
||||
if int(job['job_id']) == int(jobid) and not self._jobIdInCC(job['job_id']):
|
||||
self._ccStartJob(job)
|
||||
sync_count = sync_count + 1
|
||||
else:
|
||||
if not self._jobIdInCC(job['job_id']):
|
||||
self._ccStartJob(job)
|
||||
sync_count = sync_count + 1
|
||||
if sync_count >= limit:
|
||||
print("INFO: sync limit (%s) reached" % limit)
|
||||
break
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
import argparse
|
||||
about = """This script syncs the slurm jobs with the cluster cockpit backend. It uses
|
||||
the slurm command line tools to gather the relevant slurm infos and reads
|
||||
the corresponding info from cluster cockpit via its api.
|
||||
|
||||
After reading the data, it stops all jobs in cluster cockpit which are
|
||||
not running any more according to slurm and afterwards it creates all new
|
||||
running jobs in cluster cockpit.
|
||||
"""
|
||||
parser = argparse.ArgumentParser(description=about)
|
||||
parser.add_argument(
|
||||
"-c", "--config", help="Read config file. Default: config.json", default="config.json")
|
||||
parser.add_argument(
|
||||
"-d", "--debug", help="Enable debug output", action="store_true")
|
||||
parser.add_argument("-j", "--jobid", help="Sync this jobid")
|
||||
parser.add_argument(
|
||||
"-l", "--limit", help="Stop after n sync actions in each direction. Default: 200", default="200", type=int)
|
||||
parser.add_argument("--direction", help="Only sync in this direction",
|
||||
default="both", choices=['both', 'start', 'stop'])
|
||||
args = parser.parse_args()
|
||||
|
||||
# open config file
|
||||
if args.debug:
|
||||
print("DEBUG: load config file: %s" % args.config)
|
||||
with open(args.config, 'r', encoding='utf-8') as f:
|
||||
config = json.load(f)
|
||||
if args.debug:
|
||||
print("DEBUG: config file contents:")
|
||||
print(config)
|
||||
|
||||
s = CondorSync(config, args.debug)
|
||||
s.sync(args.limit, args.jobid, args.direction)
|
@ -1,13 +0,0 @@
|
||||
--- slurm-21.08.6_bak/src/plugins/openapi/v0.0.37/jobs.c 2022-02-24 20:10:29.000000000 +0100
|
||||
+++ slurm-21.08.6/src/plugins/openapi/v0.0.37/jobs.c 2022-07-14 15:02:59.492972019 +0200
|
||||
@@ -773,9 +773,7 @@
|
||||
(i /
|
||||
j->cores_per_socket[sock_inx]));
|
||||
data_t *core = data_key_set_int(
|
||||
- cores,
|
||||
- (i %
|
||||
- j->cores_per_socket[sock_inx]));
|
||||
+ cores, i);
|
||||
|
||||
if (bit_test(j->core_bitmap_used,
|
||||
bit_inx)) {
|
@ -1 +0,0 @@
|
||||
tailf==0.2.5
|
70
scripts/stop-jobs.py
Normal file
70
scripts/stop-jobs.py
Normal file
@ -0,0 +1,70 @@
|
||||
#!/usr/bin/python3
|
||||
import time
|
||||
from dateutil import parser as dateparser
|
||||
import requests
|
||||
import json
|
||||
|
||||
|
||||
class CCApi:
|
||||
config = {}
|
||||
apiurl = ''
|
||||
apikey = ''
|
||||
headers = {}
|
||||
debug = False
|
||||
|
||||
def __init__(self, config, debug=False):
|
||||
self.config = config
|
||||
self.apiurl = "%s/api/" % config['cc-backend']['host']
|
||||
self.apikey = config['cc-backend']['apikey']
|
||||
self.headers = {'accept': 'application/ld+json',
|
||||
'Content-Type': 'application/json',
|
||||
'Authorization': 'Bearer %s' % self.config['cc-backend']['apikey']}
|
||||
self.debug = debug
|
||||
|
||||
def stopJob(self, id, data):
|
||||
url = self.apiurl+"jobs/stop_job/%d" % id
|
||||
r = requests.post(url, headers=self.headers, json=data)
|
||||
if r.status_code == 200:
|
||||
return r.json()
|
||||
else:
|
||||
print(data)
|
||||
print(r.status_code, r.content)
|
||||
return False
|
||||
|
||||
if __name__ == "__main__":
|
||||
import argparse
|
||||
about = """This script syncs the slurm jobs with the cluster cockpit backend. It uses
|
||||
the slurm command line tools to gather the relevant slurm infos and reads
|
||||
the corresponding info from cluster cockpit via its api.
|
||||
|
||||
After reading the data, it stops all jobs in cluster cockpit which are
|
||||
not running any more according to slurm and afterwards it creates all new
|
||||
running jobs in cluster cockpit.
|
||||
"""
|
||||
parser = argparse.ArgumentParser(description=about)
|
||||
parser.add_argument(
|
||||
"-c", "--config", help="Read config file. Default: config.json", default="config.json")
|
||||
parser.add_argument(
|
||||
"-j", "--jobs", help="Read job file file. Default: tobestopped.json", default="tobestopped.json")
|
||||
parser.add_argument(
|
||||
"-d", "--debug", help="Enable debug output", action="store_true")
|
||||
args = parser.parse_args()
|
||||
|
||||
with open(args.config, 'r', encoding='utf-8') as f:
|
||||
config = json.load(f)
|
||||
|
||||
cc = CCApi(config, args.debug)
|
||||
with open("tobestopped.json") as f:
|
||||
jobs = json.load(f)['data']['jobs']['items']
|
||||
|
||||
for job in jobs:
|
||||
startTime = int(time.mktime(dateparser.parse(job['startTime']).timetuple()))
|
||||
data = {
|
||||
"jobState": "cancelled",
|
||||
"stopTime": startTime+1,
|
||||
"cluster": job['cluster'],
|
||||
"jobId": job['jobId'],
|
||||
"startTime": startTime
|
||||
}
|
||||
cc.stopJob(job['id'], data)
|
||||
|
Loading…
Reference in New Issue
Block a user