Don't start on JobStatus chng & stop on R -> IDLE.

This is handling edgecases, where a startd died and after a timeout of 1hr the job is requeued.
The IDLE -> RUNNING state then already has all other attributes (like RemoteHost)  set, from the last execution.
We should wait for `CpusProvisioned`, though, so that all required values are already up-to-date.

Additionally, since timed-out jobs are not held, but instead just requeued, stop jobs as well when the state changes from `RUNNING` to `IDLE`.
This commit is contained in:
Joachim Meyer 2022-12-19 11:24:24 +01:00
parent 6128b58cbd
commit e38e6fa5b9

View File

@ -40,11 +40,8 @@ using namespace std;
using namespace nlohmann; using namespace nlohmann;
static const std::unordered_map<int, std::string> jobStateMap = { static const std::unordered_map<int, std::string> jobStateMap = {
{RUNNING, "running"}, {IDLE, "failed"}, {RUNNING, "running"}, {REMOVED, "cancelled"},
{REMOVED, "cancelled"}, {HELD, "failed"}, {COMPLETED, "completed"}, {SUSPENDED, "stopped"}};
{HELD, "failed"},
{COMPLETED, "completed"},
{SUSPENDED, "stopped"}};
void CCSyncPlugin::earlyInitialize() { void CCSyncPlugin::earlyInitialize() {
lock_guard<std::mutex> guard(data_mutex); lock_guard<std::mutex> guard(data_mutex);
@ -186,13 +183,19 @@ void CCSyncPlugin::setAttribute(const char *key, const char *_name,
for (auto &jobEntry : currentStatus[clusterId]) { for (auto &jobEntry : currentStatus[clusterId]) {
int actualProcId = jobEntry.first; int actualProcId = jobEntry.first;
currentStatus[clusterId][actualProcId][name] = value; currentStatus[clusterId][actualProcId][name] = value;
// dprintf(D_FULLDEBUG,"[%d.%d] ", clusterId, actualProcId); // dprintf(D_FULLDEBUG, "[%d.%d] %s: %s\n", clusterId, actualProcId,
// name.c_str(), value.c_str());
if (actualProcId != -1) { if (actualProcId != -1) {
// dprintf(D_FULLDEBUG,"{%d.%d} ", clusterId, actualProcId); // dprintf(D_FULLDEBUG,"{%d.%d} ", clusterId, actualProcId);
if (!initializing) { // HTCondor bug? We should'n be receiving events if (!initializing) { // HTCondor bug? We should'n be receiving events
// before initialization is done. We will ignore // before initialization is done. We will ignore them if it happens.
// them if it happens.
if (name == "CpusProvisioned" || name == "JobStatus") // don't trigger on change to RUNNING, since we want to wait for
// RemoteHost and CpusProvisioned to be set as well
// If a job is being requeued, this would otherwise potentially
// start a job with the wrong host / CPU count..
if (name == "CpusProvisioned" ||
(name == "JobStatus" && std::stoi(value) != RUNNING))
toConsider.push_back({clusterId, procId}); toConsider.push_back({clusterId, procId});
} }
} }
@ -202,10 +205,18 @@ void CCSyncPlugin::setAttribute(const char *key, const char *_name,
// dprintf(D_FULLDEBUG,"Assigning attribute to JobId %d.%d\n", clusterId, // dprintf(D_FULLDEBUG,"Assigning attribute to JobId %d.%d\n", clusterId,
// procId); // procId);
currentStatus[clusterId][procId][name] = value; currentStatus[clusterId][procId][name] = value;
// dprintf(D_FULLDEBUG, "[%d.%d] %s: %s\n", clusterId, procId, name.c_str(),
// value.c_str());
if (!initializing) { // HTCondor bug? We should'n be receiving events if (!initializing) { // HTCondor bug? We should'n be receiving events
// before initialization is done. We will ignore them // before initialization is done. We will ignore them
// if it happens. // if it happens.
if (name == "CpusProvisioned" || name == "JobStatus")
// don't trigger on change to RUNNING, since we want to wait for
// RemoteHost and CpusProvisioned to be set as well
// If a job is being requeued, this would otherwise potentially start a
// job with the wrong host / CPU count..
if (name == "CpusProvisioned" ||
(name == "JobStatus" && std::stoi(value) != RUNNING))
toConsider.push_back({clusterId, procId}); toConsider.push_back({clusterId, procId});
} }
} }
@ -403,7 +414,7 @@ void CCSyncPlugin::endTransaction() {
auto jobBatchNameIt = job.find("JobBatchName"); auto jobBatchNameIt = job.find("JobBatchName");
auto globalJobId = globalJobIdToInt(jobId); auto globalJobId = globalJobIdToInt(jobId);
auto hostname = getRemoteHost(job["RemoteHost"]); auto hostname = getRemoteHost(removeQuotes(job["RemoteHost"]));
json resources = {{"hostname", hostname}}; json resources = {{"hostname", hostname}};
auto accs = getAccelerators(job, hostname); auto accs = getAccelerators(job, hostname);
if (!accs.empty()) { if (!accs.empty()) {
@ -430,8 +441,8 @@ void CCSyncPlugin::endTransaction() {
sendPostRequest("/api/jobs/start_job/", j.dump()); sendPostRequest("/api/jobs/start_job/", j.dump());
} else if (lastState == RUNNING && } else if (lastState == RUNNING &&
(state == REMOVED || state == COMPLETED || state == HELD || (state == IDLE || state == REMOVED || state == COMPLETED ||
state == SUSPENDED)) { state == HELD || state == SUSPENDED)) {
std::uint64_t startTime = std::stoull(job["JobCurrentStartDate"]), std::uint64_t startTime = std::stoull(job["JobCurrentStartDate"]),
stopTime = std::stoull(job["EnteredCurrentStatus"]); stopTime = std::stoull(job["EnteredCurrentStatus"]);