cc-docker/slurm-prep-pika_v4.c
2024-02-14 04:45:40 +08:00

1032 lines
25 KiB
C

/*****************************************************************************\
* prep_script.c - PrEp script plugin, handles Prolog / Epilog /
* PrologSlurmctld / EpilogSlurmctld scripts
*****************************************************************************
* Written by Mike Arnhold and Frank Winkler
*
* LICENSE NOTICE ?!?!?!?!
\*****************************************************************************/
#include <stdarg.h>
#include <unistd.h>
#include <inttypes.h>
#include <nats/nats.h>
#include "slurm/slurm.h"
#include "slurm/slurm_errno.h"
#include "src/common/prep.h"
#include "src/common/macros.h"
#include "src/common/xmalloc.h"
#include "src/common/xstring.h"
#include "src/common/parse_time.h"
#include "src/common/uid.h"
#define P_NAME "PrEp-pika: "
#define BUFF_LEN 128
#define MAX_STR_LEN 10240 /* 10 KB */
const char plugin_name[] = "PrEp plugin pika";
const char plugin_type[] = "prep/pika";
const uint32_t plugin_version = SLURM_VERSION_NUMBER;
static bool have_prolog_slurmctld = false;
static bool have_epilog_slurmctld = false;
void (*prolog_slurmctld_callback)(int rc, uint32_t job_id) = NULL;
void (*epilog_slurmctld_callback)(int rc, uint32_t job_id) = NULL;
char *prefix = "JOB_RECORD__";
char *exclude_keys_prolog = "";
char *exclude_keys_epilog = "";
static pthread_mutex_t plugin_log_lock = PTHREAD_MUTEX_INITIALIZER;
typedef enum {
PROLOG = 0,
EPILOG = 1
} context_t;
/**************************
* json helper data types *
**************************/
typedef enum
{
SUCCESS = 0,
E_JSON_OUT_OF_MEMORY = -2,
E_HOST_LIST_SHIFT = -3,
E_HOST_LIST_CREATE = -4,
E_PREFIX_PREPEND = -5,
} internal_error_t;
typedef enum
{
NONE = -1,
U16,
U32,
U64,
U16PP,
CHARPP,
CHARP,
TIMEP,
} value_type_t;
typedef struct {
uint16_t **items;
uint16_t *item_counts; // common type for sub count in slurm?
uint32_t count; // common type for over all count in slurm?
} uint16_2d_t;
typedef struct {
char **items;
int count;
} char_2d_t;
typedef union
{
uint16_t uint16;
uint32_t uint32;
uint64_t uint64;
char *charp;
char_2d_t charpp;
uint16_2d_t uint16pp;
time_t *timep;
} value_t;
typedef struct
{
job_record_t *job_ptr;
char * job_name;
uint64_t mem_per_cpu;
uint64_t mem_per_node;
uint16_t **ids;
uint16_t *id_cnts;
char **node_names;
int name_cnt;
char *user;
char *job_state;
uint32_t gpu_alloc_cnt;
uint16_t smt_enabled;
char *job_script;
uint32_t core_cnt;
} prep_t;
typedef struct key_value_pair key_value_pair_t;
struct key_value_pair
{
char *key;
value_type_t type;
void (*get_value)(key_value_pair_t *, prep_t *);
value_t value;
bool collect[2];
};
key_value_pair_t *collect;
/*************************
* function declarations *
*************************/
/* Added spank_like output functions for convenience */
extern void slurm_info (const char *format, ...)
__attribute__ ((format (printf, 1, 2)));
extern void slurm_error (const char *format, ...)
__attribute__ ((format (printf, 1, 2)));
extern void slurm_verbose (const char *format, ...)
__attribute__ ((format (printf, 1, 2)));
extern void slurm_debug (const char *format, ...)
__attribute__ ((format (printf, 1, 2)));
extern void slurm_debug2 (const char *format, ...)
__attribute__ ((format (printf, 1, 2)));
extern void slurm_debug3 (const char *format, ...)
__attribute__ ((format (printf, 1, 2)));
/* Forward declerations */
static int job_ptr_to_json(job_record_t *job_ptr, char **json, context_t cntxt);
static const char *error_to_string(internal_error_t error);
static int copy_key_value_pairs(key_value_pair_t *dst,
key_value_pair_t *src,
size_t size);
static int pika_metadata_log(int job_id, char *json, bool is_epilog);
/* Helper macro.
* Uses _expr (usually an expression returning a number/error)
* to either advance an offset value _o by that number or
* return the number as error _e if it is negative */
#define ADVANCE_OR_RETURN(_o, _e, _expr) \
{\
_e = _expr; \
if (_e < 0) return _e; \
else _o += _e; \
}
/* Anonymous function macro. Requires GNU C extensions (gcc).
* Provide a boidy for the key_value_pair_t get_value callback.
* The injected parameter names:
* - key_value_pair_t: kvp
* - prep_t: prep
*/
#define VALUE_CB(body) \
({ \
void __fn__ (key_value_pair_t *kvp, prep_t *prep) { body } \
__fn__; \
})
/**************
* plugin api *
**************/
extern int init(void)
{
slurm_info(P_NAME "init\n");
key_value_pair_t collectables[] = {
{"ARRAY_ID", U32, VALUE_CB(
kvp->value.uint32=prep->job_ptr->array_job_id;
)},
{"JOB_NAME", CHARP, VALUE_CB(
kvp->value.charp=prep->job_name;
)},
{"USER_ID", U32, VALUE_CB(
kvp->value.uint32=prep->job_ptr->user_id;
)},
{"ACCOUNT", CHARP, VALUE_CB(
kvp->value.charp=prep->job_ptr->account;
)},
{"WORK_DIR", CHARP, VALUE_CB(
kvp->value.charp=prep->job_ptr->details->work_dir;
)},
{"PARTITION", CHARP, VALUE_CB(
kvp->value.charp=prep->job_ptr->partition;
)},
{"CPU_CNT", U32, VALUE_CB(
kvp->value.uint32=prep->job_ptr->job_resrcs->ncpus;
)},
{"CPU_IDS", U16PP, VALUE_CB(
kvp->value.uint16pp.items=prep->ids;
kvp->value.uint16pp.item_counts=prep->id_cnts;
kvp->value.uint16pp.count=prep->job_ptr->job_resrcs->nhosts;
)},
{"NODE_CNT", U32, VALUE_CB(
kvp->value.uint32=prep->job_ptr->job_resrcs->nhosts;
)},
{"NODE_NAMES", CHARPP, VALUE_CB(
kvp->value.charpp.items=prep->node_names;
kvp->value.charpp.count=prep->name_cnt;
)},
{"MEM_PER_CPU", U64, VALUE_CB(
kvp->value.uint64=prep->mem_per_cpu;
)},
{"MEM_PER_NODE", U64, VALUE_CB(
kvp->value.uint64=prep->mem_per_node;
)},
{"TIME_LIMIT", U32, VALUE_CB(
kvp->value.uint32=prep->job_ptr->time_limit;
)},
{"WHOLE_NODE", U16, VALUE_CB(
kvp->value.uint16=prep->job_ptr->details->whole_node;
)},
{"COMMENT", CHARP, VALUE_CB(
kvp->value.charp=prep->job_ptr->comment;
)},
{"START_TIME", TIMEP, VALUE_CB(
kvp->value.timep=&(prep->job_ptr->start_time);
)},
{"USER", CHARP, VALUE_CB(
kvp->value.charp=prep->user;
)},
{"PARTITION_NAME", CHARP, VALUE_CB(
kvp->value.charp=prep->job_ptr->part_ptr->name;
)},
{"BILLABLE_CORES", U32, VALUE_CB(
kvp->value.uint32=(int)(prep->job_ptr->billable_tres);
)},
{"SMT_ENABLED", U16, VALUE_CB(
kvp->value.uint16=prep->smt_enabled;
)},
{"GPU_ALLOC_CNT", U32, VALUE_CB(
kvp->value.uint32=prep->gpu_alloc_cnt;
)},
{"SUBMIT_TIME", TIMEP, VALUE_CB(
kvp->value.timep=&(prep->job_ptr->details->submit_time);
)},
{"END_TIME", TIMEP, VALUE_CB(
kvp->value.timep=&(prep->job_ptr->end_time);
)},
{"JOB_STATE", CHARP, VALUE_CB(
kvp->value.charp=prep->job_state;
)},
{"GRES_DETAIL_STR", CHARPP, VALUE_CB(
kvp->value.charpp.items=prep->job_ptr->gres_detail_str;
kvp->value.charpp.count=prep->job_ptr->gres_detail_cnt;
)},
{"BATCH_FEATURES", CHARP, VALUE_CB(
kvp->value.charp=prep->job_ptr->batch_features;
)},
{"ARGV", CHARPP, VALUE_CB(
kvp->value.charpp.items=prep->job_ptr->details->argv;
kvp->value.charpp.count=prep->job_ptr->details->argc;
)},
{"JOB_SCRIPT", CHARP, VALUE_CB(
kvp->value.charp=prep->job_script;
)},
{"CORE_CNT", U32, VALUE_CB(
kvp->value.uint32=prep->core_cnt;
)},
{"SPANK_JOB_ENV", CHARPP, VALUE_CB(
kvp->value.charpp.items=prep->job_ptr->spank_job_env;
kvp->value.charpp.count=prep->job_ptr->spank_job_env_size;
)},
{NULL, NONE},
};
size_t size = sizeof(collectables)/sizeof(key_value_pair_t);
collect = xmalloc(sizeof(key_value_pair_t) * size);
// store_conf_values_or_defaults();
int rc = copy_key_value_pairs(collect, collectables, size);
if (rc) {
slurm_error(P_NAME "%s", error_to_string(rc));
return SLURM_ERROR;
}
return SLURM_SUCCESS;
}
extern void fini(void)
{
key_value_pair_t *ptr = collect;
if (ptr) {
while (ptr->key) {
xfree(ptr->key);
}
xfree(collect);
}
}
extern void prep_p_register_callbacks(prep_callbacks_t *callbacks)
{
/*
* Cannot safely run these without a valid callback, so disable
* them.
*/
if (!(prolog_slurmctld_callback = callbacks->prolog_slurmctld))
have_prolog_slurmctld = false;
if (!(epilog_slurmctld_callback = callbacks->epilog_slurmctld))
have_epilog_slurmctld = false;
}
extern int prep_p_prolog(job_env_t *job_env, slurm_cred_t *cred)
{
printf("Started job id: %d\n", job_env->jobid);
char *json;
job_record_t *job_ptr = find_job_record(job_env->jobid);
int rc;
internal_error_t _rc = job_ptr_to_json(job_ptr, &json, false);
if (_rc) {
slurm_error(P_NAME "%s", error_to_string(_rc));
rc = SLURM_ERROR;
}
// Some async task
if (rc == SLURM_SUCCESS) {
pika_metadata_log(job_ptr->job_id, json, false);
natsConnection *nc = NULL;
natsSubscription *sub = NULL;
// Connects to the default NATS Server running locally
natsConnection_ConnectTo(&nc, NATS_DEFAULT_URL);
// Simple publisher to send the given json string to subject "slurm_start_job"
natsConnection_PublishString(nc, "slurm_start_job", json);
natsSubscription_Destroy(sub);
natsConnection_Close(nc);
}
return SLURM_SUCCESS;
}
extern int prep_p_epilog(job_env_t *job_env, slurm_cred_t *cred)
{
printf("Ended job id: %d, exit code: %d\n", job_env->jobid, job_env->exit_code);
char *json;
job_record_t *job_ptr = find_job_record(job_env->jobid);
int rc;
internal_error_t _rc = job_ptr_to_json(job_ptr, &json, false);
if (_rc) {
slurm_error(P_NAME "%s", error_to_string(_rc));
rc = SLURM_ERROR;
}
// Some async task
if (rc == SLURM_SUCCESS) {
pika_metadata_log(job_ptr->job_id, json, false);
natsConnection *nc = NULL;
natsSubscription *sub = NULL;
// Connects to the default NATS Server running locally
natsConnection_ConnectTo(&nc, NATS_DEFAULT_URL);
// Simple publisher to send the given json string to subject "slurm_start_job"
natsConnection_PublishString(nc, "slurm_start_job", json);
natsSubscription_Destroy(sub);
natsConnection_Close(nc);
}
return SLURM_SUCCESS;
}
extern int prep_p_prolog_slurmctld(job_record_t *job_ptr, bool *async)
{
int rc = SLURM_SUCCESS;
char* user = uid_to_string_or_null(job_ptr->user_id);
if ( strcmp(user, "fwinkler") != 0 &&
strcmp(user, "rotscher") != 0 ) {
xfree(user);
return rc;
}
xfree(user);
slurm_info(P_NAME "prep_p_prolog_slurmctld\n");
char *json;
internal_error_t _rc = job_ptr_to_json(job_ptr, &json, false);
if (_rc) {
slurm_error(P_NAME "%s", error_to_string(_rc));
rc = SLURM_ERROR;
}
*async = have_prolog_slurmctld;
if (*async) {
/* MUST run before async task finishes */
prolog_slurmctld_callback(rc, job_ptr->job_id);
}
// Some async task
if (rc == SLURM_SUCCESS) {
pika_metadata_log(job_ptr->job_id, json, false);
}
return rc;
}
extern int prep_p_epilog_slurmctld(job_record_t *job_ptr, bool *async)
{
int rc = SLURM_SUCCESS;
char* user = uid_to_string_or_null(job_ptr->user_id);
if ( strcmp(user, "fwinkler") != 0 &&
strcmp(user, "rotscher") != 0 ) {
xfree(user);
return rc;
}
xfree(user);
slurm_info(P_NAME "prep_p_epilog_slurmctld\n");
char *json;
internal_error_t _rc = job_ptr_to_json(job_ptr, &json, true);
if (_rc) {
slurm_error(P_NAME "%s", error_to_string(_rc));
rc = SLURM_ERROR;
}
*async = have_epilog_slurmctld;
if (*async) {
/* MUST run before async task finishes */
epilog_slurmctld_callback(rc, job_ptr->job_id);
}
// Some async task
if (rc == SLURM_SUCCESS) {
pika_metadata_log(job_ptr->job_id, json, true);
}
return rc;
}
extern void prep_p_required(prep_call_type_t type, bool *required)
{
*required = false;
switch (type) {
case PREP_PROLOG_SLURMCTLD:
if (running_in_slurmctld())
*required = true;
break;
case PREP_EPILOG_SLURMCTLD:
if (running_in_slurmctld())
*required = true;
break;
case PREP_PROLOG:
case PREP_EPILOG:
if (running_in_slurmd())
*required = true;
break;
default:
return;
}
return;
}
/**
* Return string representation of error.
*/
static const char *error_to_string(internal_error_t error)
{
switch (error) {
case SUCCESS:
return "success";
case E_JSON_OUT_OF_MEMORY:
return "ran out of memory when building json string";
case E_HOST_LIST_SHIFT:
case E_HOST_LIST_CREATE:
return "failed to retrieve node names";
case E_PREFIX_PREPEND:
return "failed to prepend prefix to keys";
default:
return "unknown error";
}
}
/**
* Infer per cpu or per node cpus usage. Unused values will be set to NO_VAL64.
*/
static void get_memory(job_record_t *job_ptr, prep_t *preps)
{
uint64_t pn_min_memory = job_ptr->details->pn_min_memory;
preps->mem_per_cpu = NO_VAL64;
preps->mem_per_node = NO_VAL64;
if (pn_min_memory & MEM_PER_CPU){
preps->mem_per_cpu = pn_min_memory & ~MEM_PER_CPU;
} else {
preps->mem_per_node = pn_min_memory;
}
}
/**
* Read the job resource core bitmap to infer cpu ids.
*/
static int get_cpu_ids(job_record_t *job_ptr, prep_t *prep)
{
int rc = SLURM_SUCCESS;
job_resources_t *job_resrcs = job_ptr->job_resrcs;
uint16_t ncores = 0;
uint32_t nhosts = job_resrcs->nhosts;
uint32_t reps_remain = 0;
int h, c, b, sock_ind;
bitoff_t offset;
bitstr_t *bitmap = job_resrcs->core_bitmap;
prep->ids = xmalloc(sizeof(uint16_t*) * nhosts);
prep->id_cnts = xmalloc(sizeof(uint16_t) * nhosts);
for(h = 0, sock_ind = 0, offset = 0; h < nhosts && !rc; ++h){
if(reps_remain == 0){
reps_remain = job_resrcs->sock_core_rep_count[sock_ind];
ncores = job_resrcs->sockets_per_node[sock_ind] *
job_resrcs->cores_per_socket[sock_ind];
++sock_ind;
}
--reps_remain;
prep->ids[h] = xmalloc(sizeof(uint16_t) * ncores);
for(c = 0, b = 0; c < ncores && b < ncores; ++b){
if(bit_test(bitmap, b + offset)){
prep->ids[h][c++] = b;
}
}
prep->id_cnts[h] = c;
offset += ncores;
}
prep->core_cnt = 0;
for (h = 0; h < nhosts; ++h) {
prep->core_cnt += prep->id_cnts[h];
}
return rc;
}
/**
* Get node names as array by splitting the job resource nodes string.
*/
static int get_nodes_names(job_record_t *job_ptr, prep_t *prep)
{
int rc = 0, i;
char *host;
hostlist_t hl;
if ((hl = slurm_hostlist_create(job_ptr->job_resrcs->nodes))) {
prep->name_cnt = slurm_hostlist_count(hl);
prep->node_names = xmalloc(sizeof(char*) * prep->name_cnt);
for (i = 0; i < prep->name_cnt && !rc; ++i) {
if ((host = slurm_hostlist_shift(hl))) {
prep->node_names[i] = host;
} else {
rc = E_HOST_LIST_SHIFT;
}
}
hostlist_destroy(hl);
return rc;
}
return E_HOST_LIST_CREATE;
}
static void free_nodes_names(char **node_names, int count)
{
int i;
char *item;
for (i = 0; i < count; ++i) {
item = node_names[i];
if (item) {
free(item);
}
}
xfree(node_names);
}
/**
* Get current job state.
*/
static void get_job_state(job_record_t *job_ptr, prep_t *prep)
{
prep->job_state = (char *)xmalloc(10 * sizeof(char));
if ( IS_JOB_RUNNING(job_ptr) )
sprintf(prep->job_state, "running");
else if ( IS_JOB_COMPLETE(job_ptr) )
sprintf(prep->job_state, "completed");
else if ( IS_JOB_CANCELLED(job_ptr) )
sprintf(prep->job_state, "cancelled");
else if ( IS_JOB_TIMEOUT(job_ptr) )
sprintf(prep->job_state, "timeout");
else if ( IS_JOB_OOM(job_ptr) )
sprintf(prep->job_state, "OOM");
else
sprintf(prep->job_state, "failed");
}
static int get_gpu_number( job_record_t *job_ptr, prep_t *prep)
{
int rc = SLURM_SUCCESS;
slurm_info(P_NAME "get_gpu_number job_ptr->gres_used=%s\n", job_ptr->gres_used);
//check gpu_alloc_cnt
if ( job_ptr->gres_used != NULL ) {
char *gres_alloc = strdup(job_ptr->gres_used);
char* ptr = strtok(gres_alloc, "gpu:");
while(ptr != NULL)
{
prep->gpu_alloc_cnt = atoi(ptr);
ptr = strtok(NULL, "gpu:");
}
free(ptr);
free(gres_alloc);
}
return rc;
}
/**
* Print string at offset to a xmalloced string.
* Memory will be resized as needed.
*/
static int xsnprintf_realloc(char **json, size_t offset, const char *fmt, ...)
{
va_list args;
size_t size = xsize(*json);
int rc;
va_start(args, fmt);
rc = vsnprintf(*json + offset, size - offset, fmt, args);
va_end(args);
if (rc >= size - offset) {
do {
if (size >= SIZE_MAX / 2) {
return E_JSON_OUT_OF_MEMORY;
}
size *= 2;
} while (rc >= size - offset);
*json = xrealloc(*json, sizeof(char) * size);
va_start(args, fmt);
rc = vsnprintf(*json + offset, size - offset, fmt, args);
va_end(args);
}
return rc;
}
/**
* Prepend prefix to string and xmallocs new memory with provided size.
*/
static int xprepend_prefix(char **dst, char *src, size_t size)
{
char buffer[size];
int rc = snprintf(buffer, size, "%s%s", prefix, src);
*dst = xstrndup(buffer, size);
return rc;
}
static int json_init(char **json, size_t offset, uint32_t job_id)
{
return xsnprintf_realloc(json, offset, "{\"%"PRIu32"\":{", job_id);
}
static int
json_append_uint16(char **json, size_t offset, const char *key, uint16_t value)
{
return xsnprintf_realloc(json, offset, "\"%s\": %"PRIu16",", key, value);
}
static int
json_append_uint32(char **json, size_t offset, const char *key, uint32_t value)
{
return xsnprintf_realloc(json, offset, "\"%s\": %"PRIu32",", key, value);
}
static int
json_append_uint64(char **json, size_t offset, const char *key, uint64_t value)
{
return xsnprintf_realloc(json, offset, "\"%s\": %"PRIu64",", key, value);
}
static int
json_append_uint16_array_2d(char **json, size_t offset, const char *key,
uint16_t **values,
uint16_t *value_counts,
uint32_t count)
{
int i, j;
int error = 0;
int rc = 0;
ADVANCE_OR_RETURN(rc, error, xsnprintf_realloc(json, offset, "\"%s\": [", key));
for (i = 0; i < count; ++i) {
ADVANCE_OR_RETURN(rc, error, xsnprintf_realloc(json, offset + rc, "["));
for (j = 0; j < value_counts[i]; ++j) {
ADVANCE_OR_RETURN(rc, error, xsnprintf_realloc(json, offset + rc, "%i,", values[i][j]));
}
if (j > 0) {
(*json)[strlen(*json) - 1] = '\0';
--rc;
}
ADVANCE_OR_RETURN(rc, error, xsnprintf_realloc(json, offset + rc, "],"));
}
if (i > 0) {
(*json)[strlen(*json) - 1] = '\0';
--rc;
}
ADVANCE_OR_RETURN(rc, error, xsnprintf_realloc(json, offset + rc, "],"));
return rc;
}
static int
json_append_string_array(char **json, size_t offset, const char* key,
char **values, int count)
{
int rc = 0, error = 0;
int i;
ADVANCE_OR_RETURN(rc, error, xsnprintf_realloc(json, offset, "\"%s\": [", key));
for (i = 0; i < count; ++i) {
ADVANCE_OR_RETURN(rc, error, xsnprintf_realloc(json, offset + rc, "\"%s\",", values[i]));
}
if (i > 0) {
(*json)[offset + rc] = '\0';
--rc;
}
ADVANCE_OR_RETURN(rc, error, xsnprintf_realloc(json, offset + rc, "],"));
return rc;
}
static int
json_append_string(char **json, size_t offset, const char *key,
const char *value)
{
return xsnprintf_realloc(json, offset, "\"%s\": \"%s\",", key, value);
}
static int json_append_time(char **json, size_t offset, const char *key,
time_t *time_ptr)
{
char time_str[32];
slurm_make_time_str(time_ptr, time_str, 32);
return xsnprintf_realloc(json, offset, "\"%s\": \"%s\",", key, time_str);
}
static int json_fini(char **json, bool has_data)
{
size_t offstet = strlen(*json);
if (has_data) {
(*json)[offstet - 1] = '}';
return xsnprintf_realloc(json, offstet, "%c", '}');
} else {
return xsnprintf_realloc(json, offstet, "%s", "}}");
}
}
static int
json_append_key_value_pair(char **json, size_t offset, key_value_pair_t *kvp)
{
switch (kvp->type) {
case U16:
return json_append_uint16(
json, offset, kvp->key,
kvp->value.uint16
);
case U32:
return json_append_uint32(
json, offset, kvp->key,
kvp->value.uint32
);
case U64:
return json_append_uint64(
json, offset, kvp->key,
kvp->value.uint64
);
case CHARPP:
return json_append_string_array(
json, offset, kvp->key,
kvp->value.charpp.items,
kvp->value.charpp.count
);
case CHARP:
return json_append_string(
json, offset, kvp->key,
kvp->value.charp
);
case U16PP:
return json_append_uint16_array_2d(
json, offset, kvp->key,
kvp->value.uint16pp.items,
kvp->value.uint16pp.item_counts,
kvp->value.uint16pp.count
);
case TIMEP:
return json_append_time(
json, offset, kvp->key, kvp->value.timep
);
default:
return xsnprintf_realloc(
json, offset,
"\"%s\":\"data type not recognised\",", kvp->key
);
}
}
static bool
key_required(const char *key, bool *all, bool *collect)
{
collect[PROLOG] = all[PROLOG] || (key && !strstr(exclude_keys_prolog, key));
collect[EPILOG] = all[EPILOG] || (key && !strstr(exclude_keys_epilog, key));
return collect[PROLOG] || collect[EPILOG];
}
/**
* Copy key value pairs and preped prefix to keys, which will be xmalloced or
* nulled on error.
* Returns SUCCESS or E_PREFIX_PREPEND;
*/
static internal_error_t
copy_key_value_pairs(key_value_pair_t *dst, key_value_pair_t *src, size_t size)
{
int rc = SLURM_SUCCESS;
int i;
bool all[] = {
strlen(exclude_keys_prolog) == 0,
strlen(exclude_keys_epilog) == 0,
};
bool collect[2];
key_value_pair_t *dst_ptr = dst;
key_value_pair_t *src_ptr = src;
for (i = 0; i < size && rc >= 0; ++i, ++src_ptr) {
if (key_required(src_ptr->key, all, collect)) {
*dst_ptr = *src_ptr;
dst_ptr->collect[PROLOG] = collect[PROLOG];
dst_ptr->collect[EPILOG] = collect[EPILOG];
if (src_ptr->key) {
dst_ptr->key = NULL;
rc = xprepend_prefix(&(dst_ptr->key), src_ptr->key, BUFF_LEN);
}
++dst_ptr;
}
}
return rc >= 0 ? SUCCESS : E_PREFIX_PREPEND;
}
static char _convert_dec_hex(char x)
{
if (x <= 9)
x += '0';
else
x += 'A' - 10;
return x;
}
/* Escape characters according to RFC7159 and ECMA-262 11.8.4.2 */
static char *_json_escape(const char *str)
{
char *ret = NULL;
int i, o, len;
len = strlen(str) * 2 + 128;
ret = xmalloc(len);
for (i = 0, o = 0; str[i]; ++i) {
if (o >= MAX_STR_LEN) {
break;
} else if ((o + 8) >= len) {
len *= 2;
ret = xrealloc(ret, len);
}
switch (str[i]) {
case '\\':
ret[o++] = '\\';
ret[o++] = '\\';
break;
case '"':
ret[o++] = '\\';
ret[o++] = '\"';
break;
case '\n':
ret[o++] = '\\';
ret[o++] = 'n';
break;
case '\b':
ret[o++] = '\\';
ret[o++] = 'b';
break;
case '\f':
ret[o++] = '\\';
ret[o++] = 'f';
break;
case '\r':
ret[o++] = '\\';
ret[o++] = 'r';
break;
case '\t':
ret[o++] = '\\';
ret[o++] = 't';
break;
break;
case '/':
ret[o++] = '\\';
ret[o++] = '/';
break;
default:
/* use hex for all other control characters */
if (str[i] <= 0x1f || str[i] == '\'' || str[i] == '<' ||
str[i] == 0x5C) {
ret[o++] = '\\';
ret[o++] = 'u';
ret[o++] = '0';
ret[o++] = '0';
ret[o++] =
_convert_dec_hex((0xf0 & str[i]) >> 4);
ret[o++] = _convert_dec_hex(0x0f & str[i]);
} else /* normal character */
ret[o++] = str[i];
}
}
return ret;
}
static internal_error_t
job_ptr_to_json(job_record_t *job_ptr, char **json, context_t context)
{
key_value_pair_t *kvp = collect;
internal_error_t rc = SUCCESS;
prep_t prep[1] = {{ .job_ptr = job_ptr }};
int h;
size_t max_len = BUFF_LEN;
size_t offset = 0;
bool has_data = false;
buf_t *script;
get_memory(job_ptr, prep);
if ((rc = get_nodes_names(job_ptr, prep)) ||
(rc = get_cpu_ids(job_ptr, prep))) {
return rc;
}
// Get user name
prep->user = uid_to_string_or_null(job_ptr->user_id);
// Check if multithreading is requested (this is a workaround)
prep->smt_enabled = 0;
if ( job_ptr->details->mc_ptr->threads_per_core == NO_VAL16 )
prep->smt_enabled = 1;
// Get job state
get_job_state(job_ptr, prep);
// Get GPU data
if ( (rc = get_gpu_number(job_ptr, prep)) )
return rc;
// Escape characters according to RFC7159 and ECMA-262 11.8.4.2
prep->job_name = _json_escape(job_ptr->name);
script = get_job_script(job_ptr);
if (script) {
prep->job_script = _json_escape(get_buf_data(script));
}
*json = xmalloc(sizeof(char) * max_len);
rc = json_init(json, 0, job_ptr->job_id);
offset += rc;
while (kvp && kvp->key && rc >= 0) {
if (kvp->collect[context]) {
has_data = true;
kvp->get_value(kvp, prep);
rc = json_append_key_value_pair(json, offset, kvp);
if (rc >= 0) {
offset += rc;
}
}
++kvp;
}
if (rc >= 0) {
rc = json_fini(json, has_data);
}
free_nodes_names(prep->node_names, prep->name_cnt);
for (h = 0; h < job_ptr->job_resrcs->nhosts; ++h) {
xfree(prep->ids[h]);
}
xfree(prep->ids);
xfree(prep->id_cnts);
xfree(prep->user);
xfree(prep->job_state);
xfree(prep->job_name);
if (script)
xfree(prep->job_script);
free_buf(script);
return rc >= 0 ? SUCCESS : rc;
}
static int pika_metadata_log(int job_id, char *json, bool is_epilog)
{
FILE *fp;
internal_error_t rc = SUCCESS;
char file_path[100];
sprintf(file_path, "/tmp/pika_debug/pika_prep_%d.log", job_id);
// sprintf(file_path, "/var/log/slurm/pika.log");
slurm_mutex_lock(&plugin_log_lock);
fp = fopen(file_path, "a");
if ( fp == NULL )
return (-1);
fprintf(fp, "----\n");
if ( !is_epilog ) {
fprintf(fp, "Prolog Logging of job %d\n", job_id);
} else {
fprintf(fp, "Epilog Logging of job %d\n", job_id);
}
fprintf(fp, "%s\n", json);
xfree(json);
fclose(fp);
slurm_mutex_unlock(&plugin_log_lock);
return rc >= 0 ? SUCCESS : rc;
}