diff --git a/CMakeLists.txt b/CMakeLists.txt index 7c5f371..7fa25ba 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -42,3 +42,5 @@ target_include_directories(htcondor_cc_sync_plugin PRIVATE ${CMAKE_SOURCE_DIR}/c target_compile_options(htcondor_cc_sync_plugin PRIVATE -DCONDOR_VERSION=\"10.0.0\" -DENABLE_STATE_DUMP -DGLIBC219=GLIBC219 -DGLIBC=GLIBC -DHAVE_CONFIG_H -DLINUX=\"LINUX_3.13.0-30-GENERIC\" -DPLATFORM=\"X86_64-Linux_17\" -DPRE_RELEASE_STR=\"\" -DWITH_OPENSSL -DX86_64=X86_64 -Dcondorapi_shared_EXPORTS -DWITH_IPV6) + +target_compile_definitions(htcondor_cc_sync_plugin PRIVATE $<$,$>:VERBOSE>) diff --git a/ReadMe.md b/ReadMe.md index 015e26a..e85c21e 100644 --- a/ReadMe.md +++ b/ReadMe.md @@ -38,3 +38,5 @@ CCSYNC_SUBMIT_ID= int { // todo: eh.. if we need a length, are we potentially reading // oob?? - dprintf(D_ALWAYS, "%d: %s\n", type, ptr); + dprintf(D_VERBOSE, "%d: %s\n", type, ptr); return length; })); - // request.setOpt(curlpp::options::Verbose(true)); + request.setOpt(curlpp::options::Verbose(true)); +#endif request.perform(); - dprintf(D_ALWAYS, "response: %s\n", buf.str().c_str()); + dprintf(D_VERBOSE, "response: %s\n", buf.str().c_str()); } catch (const curlpp::LogicError &e) { dprintf(D_ALWAYS, "LogicError: %s\n", e.what()); } catch (const curlpp::RuntimeError &e) { @@ -364,41 +365,6 @@ void CCSyncPlugin::sendPostRequest(const std::string &route, } } -// std::unordered_map parseToE(const std::string &toe) -// { -// // toe = "[ Who = "itself"; How = "OF_ITS_OWN_ACCORD"; ExitCode = 2; -// HowCode = -// // 0; When = 1671108732; ExitBySignal = false ]"; -// auto start = toe.find_first_not_of("[ "); -// auto stop = toe.find_last_not_of("] "); -// std::istringstream toeStream{toe.substr(start + 1, stop - start)}; - -// std::unordered_map map; - -// std::string element; -// while (std::getline(toeStream, element, ';')) { -// auto splitter = element.find(" = "); -// auto key = element.substr(0, splitter); -// auto value = element.substr(splitter + sizeof(" = ")); -// map.emplace(key, value); -// dprintf(D_ALWAYS, "ToE field '%s': '%s'\n", key.c_str(), value.c_str()); -// } - -// return map; -// } - -// std::uint64_t getEndTime(const CondorJob &job) { -// if (auto toeIt = job.find("ToE"); toeIt != job.end()) { -// auto toe = parseToE(toeIt->second); -// if (auto whenIt = toe.find("When"); whenIt != toe.end()) -// return std::stoull(whenIt->second); // todo: always an int here? -// } else if (auto enteredStateTimeIt = job.find("EnteredCurrentStatus"); -// enteredStateTimeIt != job.end()) { -// return std::stoull(enteredStateTimeIt->second); -// } -// return 0; -// } - void CCSyncPlugin::endTransaction() { if (initializing) { // HTCondor bug? We should'n be receiving events before // initialization is done. We will ignore them if it @@ -411,70 +377,78 @@ void CCSyncPlugin::endTransaction() { // dprintf(D_FULLDEBUG,"Ending transaction.\n"); for (auto jobId : toConsider) { - if (jobId.second == -1) - continue; // don't care about the "template" + try { + if (jobId.second == -1) + continue; // don't care about the "template" - auto &job = currentStatus[jobId.first][jobId.second]; - printJobStatus(jobId, job); - if (job.empty() || !jobHasRequiredClassAds(job)) - continue; // not yet ready - - dprintf(D_ALWAYS, "JobStatus: %s\n", job["JobStatus"].c_str()); - int state = std::stoi(job["JobStatus"]), lastState; + auto &job = currentStatus[jobId.first][jobId.second]; +#ifdef VERBOSE + printJobStatus(jobId, job); +#endif + if (job.empty() || !jobHasRequiredClassAds(job)) + continue; // not yet ready - if (auto lastStateIt = job.find("LastJobStatus"); - lastStateIt != job.end()) { - dprintf(D_ALWAYS, "LastJobStatus: %s\n", lastStateIt->second.c_str()); - if (lastState = std::stoi(lastStateIt->second); lastState == state) + dprintf(D_VERBOSE, "JobStatus: %s\n", job["JobStatus"].c_str()); + int state = std::stoi(job["JobStatus"]), lastState; + + if (auto lastStateIt = job.find("LastJobStatus"); + lastStateIt != job.end()) { + dprintf(D_VERBOSE, "LastJobStatus: %s\n", lastStateIt->second.c_str()); + if (lastState = std::stoi(lastStateIt->second); lastState == state) + continue; + } else continue; - } else - continue; - if (state == RUNNING) { - auto jobBatchNameIt = job.find("JobBatchName"); - auto globalJobId = globalJobIdToInt(jobId); + if (state == RUNNING) { + auto jobBatchNameIt = job.find("JobBatchName"); + auto globalJobId = globalJobIdToInt(jobId); - auto hostname = getRemoteHost(job["RemoteHost"]); - json resources = {{"hostname", hostname}}; - auto accs = getAccelerators(job, hostname); - if (!accs.empty()) { - resources["accelerators"] = accs; + auto hostname = getRemoteHost(job["RemoteHost"]); + json resources = {{"hostname", hostname}}; + auto accs = getAccelerators(job, hostname); + if (!accs.empty()) { + resources["accelerators"] = accs; + } + + json j = { + {"jobId", globalJobId}, + {"arrayJobId", jobId.first * 10 + submitNodeId}, + {"user", removeQuotes(job["Owner"])}, + {"cluster", clusterName}, + {"numNodes", std::stoi(job["CurrentHosts"])}, + {"numHwthreads", std::stoi(job["CpusProvisioned"])}, + {"startTime", std::stoull(job["EnteredCurrentStatus"])}, + {"project", removeQuotes(job["AcctGroup"])}, + {"partition", "main"}, + {"exclusive", 0}, + {"resources", json::array({resources})}, + {"numAcc", accs.size()}, + {"metadata", + {{"jobName", + jobBatchNameIt != job.end() ? jobBatchNameIt->second : ""}}}}; + + sendPostRequest("/api/jobs/start_job/", j.dump()); + + } else if (lastState == RUNNING && + (state == REMOVED || state == COMPLETED || state == HELD || + state == SUSPENDED)) { + + std::uint64_t startTime = std::stoull(job["JobCurrentStartDate"]), + stopTime = std::stoull(job["EnteredCurrentStatus"]); + + json j = {{"jobId", globalJobIdToInt(jobId)}, + {"cluster", clusterName}, + {"jobState", jobStateMap.at(state)}, + {"startTime", startTime}, + {"stopTime", + (stopTime - startTime > 10) ? stopTime : startTime + 10}}; + + sendPostRequest("/api/jobs/stop_job/", j.dump()); } - - json j = { - {"jobId", globalJobId}, - {"arrayJobId", jobId.first * 10 + submitNodeId}, - {"user", removeQuotes(job["Owner"])}, - {"cluster", clusterName}, - {"numNodes", std::stoi(job["CurrentHosts"])}, - {"numHwthreads", std::stoi(job["CpusProvisioned"])}, - {"startTime", std::stoull(job["EnteredCurrentStatus"])}, - {"project", removeQuotes(job["AcctGroup"])}, - {"partition", "main"}, - {"exclusive", 0}, - {"resources", json::array({resources})}, - {"numAcc", accs.size()}, - {"metadata", - {{"jobName", - jobBatchNameIt != job.end() ? jobBatchNameIt->second : ""}}}}; - - sendPostRequest("/api/jobs/start_job/", j.dump()); - - } else if (lastState == RUNNING && - (state == REMOVED || state == COMPLETED || state == HELD || - state == SUSPENDED)) { - - std::uint64_t startTime = std::stoull(job["JobCurrentStartDate"]), - stopTime = std::stoull(job["EnteredCurrentStatus"]); - - json j = {{"jobId", globalJobIdToInt(jobId)}, - {"cluster", clusterName}, - {"jobState", jobStateMap.at(state)}, - {"startTime", startTime}, - {"stopTime", - (stopTime - startTime > 10) ? stopTime : startTime + 10}}; - - sendPostRequest("/api/jobs/stop_job/", j.dump()); + } catch (const std::exception &e) { + dprintf(D_ALWAYS, "exception: %s\n", e.what()); + } catch (...) { + dprintf(D_ALWAYS, "unknown exception\n"); } }