diff --git a/collectors/slurmJobDetector.go b/collectors/slurmJobDetector.go index e43a686..3022744 100644 --- a/collectors/slurmJobDetector.go +++ b/collectors/slurmJobDetector.go @@ -15,16 +15,28 @@ import ( lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric" ) +// These are the fields we read from the JSON configuration +type SlurmJobDetectorConfig struct { + Interval string `json:"interval"` + SendJobEvents bool `json:"send_job_events,omitempty"` + SendStepEvents bool `json:"send_step_events,omitempty"` + SendJobMetrics bool `json:"send_job_metrics,omitempty"` + SendStepMetrics bool `json:"send_step_metrics,omitempty"` + BaseDirectory string `json:"sysfs_base,omitempty"` + CgroupVersion string `json:"cgroup_version"` +} + +// This information is sent as JSON when an event occurs type SlurmJobMetadata struct { - UID uint64 `json:"uid"` - JobId uint64 `json:"jobid"` + UID string `json:"uid,omitempty"` + JobId string `json:"jobid"` Timestamp uint64 `json:"timestamp"` Status string `json:"status"` Step string `json:"step,omitempty"` Cpus []int `json:"cpus,omitempty"` Memories []int `json:"memories,omitempty"` - MemoryLimitHard uint64 `json:"memory_limit_hard,omitempty"` - MemoryLimitSoft uint64 `json:"memory_limit_soft,omitempty"` + MemoryLimitHard int64 `json:"memory_limit_hard,omitempty"` + MemoryLimitSoft int64 `json:"memory_limit_soft,omitempty"` Devices []string `json:"devices,omitempty"` } @@ -36,89 +48,61 @@ type SlurmJobMetrics struct { CpuUsageSys int64 } -type SlurmJobStepData struct { - Metrics SlurmJobMetrics - Step string -} - -type SlurmJobData struct { - Metrics SlurmJobMetrics - Steps []SlurmJobStepData -} - -// These are the fields we read from the JSON configuration -type SlurmJobDetectorConfig struct { - Interval string `json:"interval"` - SendJobEvents bool `json:"send_job_events,omitempty"` - SendStepEvents bool `json:"send_step_events,omitempty"` - SendJobMetrics bool `json:"send_job_metrics,omitempty"` - SendStepMetrics bool `json:"send_step_metrics,omitempty"` - ExcludeUsers []string `json:"exclude_users,omitempty"` - BaseDirectory string `json:"sysfs_base,omitempty"` -} - // This contains all variables we need during execution and the variables // defined by metricCollector (name, init, ...) type SlurmJobDetector struct { metricCollector - config SlurmJobDetectorConfig // the configuration structure - meta map[string]string // default meta information - tags map[string]string // default tags - //jobs map[string]map[string]SlurmJobData - interval time.Duration // the interval parsed from configuration - ticker *time.Ticker // own timer - output chan lp.CCMetric // own internal output channel - wg sync.WaitGroup // sync group for management - done chan bool // channel for management - files map[string]struct{} + config SlurmJobDetectorConfig // the configuration structure + meta map[string]string // default meta information + interval time.Duration // the interval parsed from configuration + ticker *time.Ticker // own timer for event checking + output chan lp.CCMetric // variable to save output channel at Read() for event sending + wg sync.WaitGroup // sync group for event checking management + done chan bool // channel for event checking management + directories map[string]SlurmJobMetadata // directory -> data mapping (data stored to re-send data in end job events) } const default_base_dir = "/sys/fs/cgroup" +const default_cgroup_version = "v1" -var cpuacct_base = fmt.Sprintf("%s/cpuacct/slurm", default_base_dir) -var memory_base = fmt.Sprintf("%s/memory/slurm", default_base_dir) -var cpuset_base = fmt.Sprintf("%s/cpuset/slurm", default_base_dir) -var devices_base = fmt.Sprintf("%s/devices/slurm", default_base_dir) +// not required to pre-initialized. Will be overwritten in Init() based on configuration +var cpuacct_base = filepath.Join(default_base_dir, "cpuacct", "slurm") +var memory_base = filepath.Join(default_base_dir, "memory", "slurm") +var cpuset_base = filepath.Join(default_base_dir, "cpuset", "slurm") +var devices_base = filepath.Join(default_base_dir, "devices", "slurm") -func getSlurmJobs() []string { - out := make([]string, 0) - globpattern := filepath.Join(cpuacct_base, "uid_[0-9]*", "job_[0-9]*") +// Filenames for cgroup/v1 +var limit_in_bytes_file = "memory.limit_in_bytes" +var soft_limit_in_bytes_file = "memory.soft_limit_in_bytes" +var cpus_effective_file = "cpuset.effective_cpus" +var mems_effective_file = "cpuset.effective_mems" +var devices_list_file = "devices.list" +var usage_in_bytes_file = "memory.usage_in_bytes" +var max_usage_in_bytes_file = "memory.max_usage_in_bytes" +var cpuacct_usage_file = "cpuacct.usage" +var cpuacct_usage_user_file = "cpuacct.usage_user" - dirs, err := filepath.Glob(globpattern) +// Filenames for cgroup/v2 +// In Init() the filenames are set based on configuration +const soft_limit_in_bytes_file_v2 = "memory.high" +const limit_in_bytes_file_v2 = "memory.max" +const cpus_effective_file_v2 = "cpuset.cpus.effective" +const mems_effective_file_v2 = "cpuset.mems.effective" +const devices_list_file_v2 = "devices.list" +const usage_in_bytes_file_v2 = "memory.usage_in_bytes" +const max_usage_in_bytes_file_v2 = "memory.max_usage_in_bytes" +const cpuacct_usage_file_v2 = "cpuacct.usage" +const cpuacct_usage_user_file_v2 = "cpuacct.usage_user" + +func fileToInt64(filename string) (int64, error) { + data, err := os.ReadFile(filename) if err == nil { - for _, d := range dirs { - r, err := filepath.Rel(cpuacct_base, d) - if err == nil { - out = append(out, r) - } + x, err := strconv.ParseInt(string(data), 0, 64) + if err == nil { + return x, err } } - return out -} - -func getSlurmSteps() []string { - out := make([]string, 0) - globpattern := filepath.Join(cpuacct_base, "uid_[0-9]*", "job_[0-9]*", "step_*") - - dirs, err := filepath.Glob(globpattern) - if err == nil { - out = append(out, dirs...) - } - return out -} - -func getId(prefix, str string) (uint64, error) { - var s string - format := prefix + "_%s" - _, err := fmt.Sscanf(str, format, &s) - if err != nil { - return 0, err - } - id, err := strconv.ParseInt(s, 0, 64) - if err != nil { - return 0, err - } - return uint64(id), nil + return 0, err } func ExpandList(strlist string) []int { @@ -151,42 +135,326 @@ func ExpandList(strlist string) []int { func ParseDevices(devlist string) []string { out := make([]string, 0) + // a *:* rwm return out } +func GetPathParts(path string) []string { + out := make([]string, 0) + uid := "" + jobid := "" + step := "" + parts := strings.Split(path, "/") + // the folders of interest are at the end of the list, so traverse + // from the back + for i := len(parts) - 1; i >= 0; i-- { + if strings.HasPrefix(parts[i], "uid_") { + uid = parts[i] + } else if strings.HasPrefix(parts[i], "job_") { + jobid = parts[i] + } else if strings.HasPrefix(parts[i], "step_") { + step = parts[i] + } + } + // only cgroup/v1 provides a uid but needs to be first entry + if len(uid) > 0 { + out = append(out, uid) + } + if len(jobid) > 0 { + out = append(out, jobid) + } + // only if it's a step folder + if len(step) > 0 { + out = append(out, step) + } + return out +} + +func GetIdsFromParts(parts []string) (string, string, string) { + uid := "" + jobid := "" + step := "" + + for _, p := range parts { + if strings.HasPrefix(p, "job_") { + jobid = strings.TrimPrefix(p, "job_") + } else if strings.HasPrefix(p, "uid_") { + uid = strings.TrimPrefix(p, "uid_") + } else if strings.HasPrefix(p, "step_") { + step = strings.TrimPrefix(p, "step_") + } + } + return uid, jobid, step +} + +func (m *SlurmJobDetector) CheckEvents(timestamp time.Time) { + var err error = nil + var dirs []string = nil + parts := make([]string, 3) + parts = append(parts, cpuacct_base) + if m.config.CgroupVersion == "v1" { + parts = append(parts, "uid_[0-9]*") + } + parts = append(parts, "job_[0-9]*") + + dirs, err = filepath.Glob(filepath.Join(parts...)) + if err != nil { + cclog.ComponentError(m.name, "Cannot get directory list for SLURM jobs") + return + } + if m.config.SendStepEvents { + parts = append(parts, "step_*") + sdirs, err := filepath.Glob(filepath.Join(parts...)) + if err != nil { + cclog.ComponentError(m.name, "Cannot get directory list for SLURM steps") + return + } + dirs = append(dirs, sdirs...) + } + + for _, d := range dirs { + // Folder not in known directories map -> New job + if _, ok := m.directories[d]; !ok { + dirParts := GetPathParts(d) + data, err := m.NewJobEvent(dirParts, timestamp, m.output) + if err == nil { + // Add the directory to the map + cclog.ComponentDebug(m.name, "Adding directory ", d, " to known directories") + m.directories[d] = data + } + } + } + for d, data := range m.directories { + // Known directory but it does not exist anymore -> Vanished/Finished job + if _, ok := stringArrayContains(dirs, d); !ok { + dirParts := GetPathParts(d) + err := m.EndJobEvent(dirParts, data, timestamp, m.output) + if err != nil { + uid, jobid, step := GetIdsFromParts(dirParts) + if len(step) == 0 { + cclog.ComponentError(m.name, "Failed to end job for user ", uid, " jobid ", jobid) + } else { + cclog.ComponentError(m.name, "Failed to end job for user ", uid, " jobid ", jobid, " step ", step) + } + } + // Remove the directory from the map + cclog.ComponentDebug(m.name, "Removing directory ", d, " to known directories") + delete(m.directories, d) + } + } +} + +func (m *SlurmJobDetector) NewJobEvent(parts []string, timestamp time.Time, output chan lp.CCMetric) (SlurmJobMetadata, error) { + uid, jobid, step := GetIdsFromParts(parts) + pathstr := filepath.Join(parts...) + if len(jobid) > 0 { + cclog.ComponentError(m.name, "No jobid in path ", pathstr) + return SlurmJobMetadata{}, fmt.Errorf("no jobid in path %s", pathstr) + } + jobtags := map[string]string{ + "type": "job", + "type-id": jobid, + } + + // Fill job JSON with data from cgroup + md := SlurmJobMetadata{ + JobId: jobid, + Timestamp: uint64(timestamp.Unix()), + Status: "start", + } + // cgroup/v2 has no uid in parts + if len(uid) > 0 { + md.UID = uid + } + if len(step) > 0 { + md.Step = step + jobtags["stype"] = "step" + jobtags["stype-id"] = step + } + + job_cpus, err := os.ReadFile(filepath.Join(cpuset_base, pathstr, cpus_effective_file)) + if err == nil { + md.Cpus = ExpandList(string(job_cpus)) + } + job_mems, err := os.ReadFile(filepath.Join(cpuset_base, pathstr, mems_effective_file)) + if err == nil { + md.Memories = ExpandList(string(job_mems)) + } + job_devs, err := os.ReadFile(filepath.Join(devices_base, pathstr, devices_list_file)) + if err == nil { + md.Devices = ParseDevices(string(job_devs)) + } + x, err := fileToInt64(filepath.Join(memory_base, pathstr, limit_in_bytes_file)) + if err == nil { + md.MemoryLimitHard = x + } + x, err = fileToInt64(filepath.Join(memory_base, pathstr, soft_limit_in_bytes_file)) + if err == nil { + md.MemoryLimitSoft = x + } + + jobjson, err := json.Marshal(md) + if err == nil { + y, err := lp.New("slurm", jobtags, m.meta, map[string]interface{}{"value": string(jobjson)}, timestamp) + if err == nil { + if len(uid) > 0 { + y.AddMeta("uid", uid) + uname, err := osuser.LookupId(uid) + if err == nil { + y.AddMeta("username", uname.Username) + } + } + y.AddMeta("metric_type", "event") + output <- y + } + } + return md, nil +} + +// Not sure if it works with steps since the folders commonly do not vanish when a job step is finished +func (m *SlurmJobDetector) EndJobEvent(parts []string, data SlurmJobMetadata, timestamp time.Time, output chan lp.CCMetric) error { + uid, jobid, step := GetIdsFromParts(parts) + pathstr := filepath.Join(parts...) + if len(jobid) > 0 { + err := fmt.Errorf("no jobid in path %s", pathstr) + cclog.ComponentError(m.name, err.Error()) + return err + } + jobtags := map[string]string{ + "type": "job", + "type-id": jobid, + } + + // Fill job JSON with data from cgroup + md := SlurmJobMetadata{ + JobId: jobid, + Timestamp: uint64(timestamp.Unix()), + Cpus: data.Cpus, + Memories: data.Memories, + Devices: data.Devices, + MemoryLimitHard: data.MemoryLimitHard, + MemoryLimitSoft: data.MemoryLimitSoft, + Status: "end", + } + // cgroup/v2 has no uid in parts + if len(uid) > 0 { + md.UID = uid + } + if len(step) > 0 { + md.Step = step + jobtags["stype"] = "step" + jobtags["stype-id"] = step + } + + jobjson, err := json.Marshal(md) + if err == nil { + y, err := lp.New("slurm", jobtags, m.meta, map[string]interface{}{"value": string(jobjson)}, timestamp) + if err == nil { + if len(uid) > 0 { + y.AddMeta("uid", uid) + uname, err := osuser.LookupId(uid) + if err == nil { + y.AddMeta("username", uname.Username) + } + } + y.AddMeta("metric_type", "event") + output <- y + } else { + return err + } + } else { + return err + } + return nil +} + +func (m *SlurmJobDetector) ReadMetrics(parts []string) (SlurmJobMetrics, error) { + jobdata := SlurmJobMetrics{ + MemoryUsage: 0, + MaxMemoryUsage: 0, + LimitMemoryUsage: 0, + CpuUsageUser: 0, + CpuUsageSys: 0, + } + + part := filepath.Join(parts...) + + x, err := fileToInt64(filepath.Join(memory_base, part, usage_in_bytes_file)) + if err == nil { + jobdata.MemoryUsage = x + } + x, err = fileToInt64(filepath.Join(memory_base, part, max_usage_in_bytes_file)) + if err == nil { + jobdata.MaxMemoryUsage = x + } + tu, err := fileToInt64(filepath.Join(cpuacct_base, part, cpuacct_usage_file)) + if err == nil { + uu, err := fileToInt64(filepath.Join(cpuacct_base, part, cpuacct_usage_user_file)) + if err == nil { + jobdata.CpuUsageUser = int64(uu/tu) * 100 + jobdata.CpuUsageSys = 100 - jobdata.CpuUsageUser + } + + } + return jobdata, nil +} + +func (m *SlurmJobDetector) SendMetrics(jobtags, jobmeta map[string]string, jobmetrics SlurmJobMetrics, timestamp time.Time, output chan lp.CCMetric) { + + y, err := lp.New("job_mem_used", jobtags, m.meta, map[string]interface{}{"value": jobmetrics.MemoryUsage}, timestamp) + if err == nil { + y.AddMeta("unit", "Bytes") + for k, v := range jobmeta { + y.AddMeta(k, v) + } + output <- y + } + y, err = lp.New("job_max_mem_used", jobtags, m.meta, map[string]interface{}{"value": jobmetrics.MaxMemoryUsage}, timestamp) + if err == nil { + y.AddMeta("unit", "Bytes") + for k, v := range jobmeta { + y.AddMeta(k, v) + } + output <- y + } + y, err = lp.New("job_cpu_user", jobtags, m.meta, map[string]interface{}{"value": jobmetrics.CpuUsageUser}, timestamp) + if err == nil { + y.AddMeta("unit", "%") + for k, v := range jobmeta { + y.AddMeta(k, v) + } + output <- y + } + y, err = lp.New("job_cpu_sys", jobtags, m.meta, map[string]interface{}{"value": jobmetrics.CpuUsageSys}, timestamp) + if err == nil { + y.AddMeta("unit", "%") + for k, v := range jobmeta { + y.AddMeta(k, v) + } + output <- y + } +} + // Init initializes the sample collector // Called once by the collector manager // All tags, meta data tags and metrics that do not change over the runtime should be set here func (m *SlurmJobDetector) Init(config json.RawMessage) error { var err error = nil - // Always set the name early in Init() to use it in cclog.Component* functions m.name = "SlurmJobDetector" // This is for later use, also call it early m.setup() - // Tell whether the collector should be run in parallel with others (reading files, ...) - // or it should be run serially, mostly for collectors actually doing measurements - // because they should not measure the execution of the other collectors + // Can be run in parallel with others m.parallel = true // Define meta information sent with each metric - // (Can also be dynamic or this is the basic set with extension through AddMeta()) m.meta = map[string]string{"source": m.name, "group": "SLURM"} - // Define tags sent with each metric - // The 'type' tag is always needed, it defines the granularity of the metric - // node -> whole system - // socket -> CPU socket (requires socket ID as 'type-id' tag) - // die -> CPU die (requires CPU die ID as 'type-id' tag) - // memoryDomain -> NUMA domain (requires NUMA domain ID as 'type-id' tag) - // llc -> Last level cache (requires last level cache ID as 'type-id' tag) - // core -> single CPU core that may consist of multiple hardware threads (SMT) (requires core ID as 'type-id' tag) - // hwthtread -> single CPU hardware thread (requires hardware thread ID as 'type-id' tag) - // accelerator -> A accelerator device like GPU or FPGA (requires an accelerator ID as 'type-id' tag) - m.tags = map[string]string{"type": "node"} - // Read in the JSON configuration + // Set configuration defaults m.config.SendJobEvents = false m.config.SendJobMetrics = false m.config.SendStepEvents = false m.config.SendStepMetrics = false + m.config.CgroupVersion = default_cgroup_version m.config.BaseDirectory = default_base_dir + // Read in the JSON configuration if len(config) > 0 { err = json.Unmarshal(config, &m.config) if err != nil { @@ -201,6 +469,10 @@ func (m *SlurmJobDetector) Init(config json.RawMessage) error { cclog.ComponentError(m.name, "Error parsing interval:", err.Error()) return err } + if m.config.CgroupVersion != "v1" && m.config.CgroupVersion != "v2" { + cclog.ComponentError(m.name, "Invalid cgroup version", m.config.CgroupVersion, ":", err.Error()) + return err + } // Storage for output channel m.output = nil @@ -209,15 +481,40 @@ func (m *SlurmJobDetector) Init(config json.RawMessage) error { // Create the own ticker m.ticker = time.NewTicker(m.interval) // Create space for storing files - m.files = make(map[string]struct{}) + m.directories = make(map[string]SlurmJobMetadata) - cpuacct_base = fmt.Sprintf("%s/cpuacct/slurm", m.config.BaseDirectory) - memory_base = fmt.Sprintf("%s/memory/slurm", m.config.BaseDirectory) - cpuset_base = fmt.Sprintf("%s/cpuset/slurm", m.config.BaseDirectory) - devices_base = fmt.Sprintf("%s/devices/slurm", m.config.BaseDirectory) + if _, err := os.Stat(m.config.BaseDirectory); err != nil { + err := fmt.Errorf("cannot find base folder %s", m.config.BaseDirectory) + cclog.ComponentError(m.name, err.Error()) + return err + } cclog.ComponentDebug(m.name, "Using base directory", m.config.BaseDirectory) + cpuacct_base = filepath.Join(m.config.BaseDirectory, "cpuacct", "slurm") + memory_base = filepath.Join(m.config.BaseDirectory, "memory", "slurm") + cpuset_base = filepath.Join(m.config.BaseDirectory, "cpuset", "slurm") + devices_base = filepath.Join(m.config.BaseDirectory, "devices", "slurm") + if m.config.CgroupVersion == "v2" { + cclog.ComponentDebug(m.name, "Reconfiguring folders and filenames for cgroup/v2") + cpuacct_base = filepath.Join(m.config.BaseDirectory, "system.slice", "slurmstepd.scope") + memory_base = filepath.Join(m.config.BaseDirectory, "system.slice", "slurmstepd.scope") + cpuset_base = filepath.Join(m.config.BaseDirectory, "system.slice", "slurmstepd.scope") + devices_base = filepath.Join(m.config.BaseDirectory, "system.slice", "slurmstepd.scope") + cpus_effective_file = cpus_effective_file_v2 + mems_effective_file = mems_effective_file_v2 + devices_list_file = devices_list_file_v2 + limit_in_bytes_file = limit_in_bytes_file_v2 + soft_limit_in_bytes_file = soft_limit_in_bytes_file_v2 + usage_in_bytes_file = usage_in_bytes_file_v2 + max_usage_in_bytes_file = max_usage_in_bytes_file_v2 + cpuacct_usage_file = cpuacct_usage_file_v2 + cpuacct_usage_user_file = cpuacct_usage_user_file_v2 + } + if _, err := os.Stat(cpuacct_base); err != nil { + err := fmt.Errorf("cannot find SLURM cgroup folder %s", cpuacct_base) + cclog.ComponentError(m.name, err.Error()) + return err + } - // Start the timer loop with return functionality by sending 'true' to the done channel m.wg.Add(1) go func() { for { @@ -228,10 +525,8 @@ func (m *SlurmJobDetector) Init(config json.RawMessage) error { m.wg.Done() return case timestamp := <-m.ticker.C: - // This is executed every timer tick but we have to wait until the first - // Read() to get the output channel - cclog.ComponentDebug(m.name, "Checking events") if m.output != nil { + cclog.ComponentDebug(m.name, "Checking events") m.CheckEvents(timestamp) } } @@ -243,508 +538,76 @@ func (m *SlurmJobDetector) Init(config json.RawMessage) error { return err } -func ReadJobData(userdir, jobdir string) (SlurmJobMetrics, error) { - jobdata := SlurmJobMetrics{ - MemoryUsage: 0, - MaxMemoryUsage: 0, - LimitMemoryUsage: 0, - CpuUsageUser: 0, - CpuUsageSys: 0, - } - job_mem := filepath.Join(memory_base, userdir, jobdir, "memory.usage_in_bytes") - mem_usage, err := os.ReadFile(job_mem) - if err == nil { - x, err := strconv.ParseInt(string(mem_usage), 0, 64) - if err == nil { - jobdata.MemoryUsage = x - } - } - job_mem = filepath.Join(memory_base, userdir, jobdir, "memory.max_usage_in_bytes") - mem_usage, err = os.ReadFile(job_mem) - if err == nil { - x, err := strconv.ParseInt(string(mem_usage), 0, 64) - if err == nil { - jobdata.MaxMemoryUsage = x - } - } - job_cpu := filepath.Join(cpuacct_base, userdir, jobdir, "cpuacct.usage") - total_usage, err := os.ReadFile(job_cpu) - if err == nil { - tu, err := strconv.ParseInt(string(total_usage), 0, 64) - if err == nil { - job_cpu = filepath.Join(cpuacct_base, userdir, jobdir, "cpuacct.usage_user") - user_usage, err := os.ReadFile(job_cpu) - if err == nil { - uu, err := strconv.ParseInt(string(user_usage), 0, 64) - if err == nil { - jobdata.CpuUsageUser = int64(uu/tu) * 100 - jobdata.CpuUsageSys = 100 - jobdata.CpuUsageUser - } - } - } - } - return jobdata, nil -} - -func ReadJobStepData(userdir, jobdir, stepdir string) (SlurmJobMetrics, error) { - jobdata := SlurmJobMetrics{ - MemoryUsage: 0, - MaxMemoryUsage: 0, - LimitMemoryUsage: 0, - CpuUsageUser: 0, - CpuUsageSys: 0, - } - job_mem := filepath.Join(memory_base, userdir, jobdir, stepdir, "memory.usage_in_bytes") - mem_usage, err := os.ReadFile(job_mem) - if err == nil { - x, err := strconv.ParseInt(string(mem_usage), 0, 64) - if err == nil { - jobdata.MemoryUsage = x - } - } - job_mem = filepath.Join(memory_base, userdir, jobdir, stepdir, "memory.max_usage_in_bytes") - mem_usage, err = os.ReadFile(job_mem) - if err == nil { - x, err := strconv.ParseInt(string(mem_usage), 0, 64) - if err == nil { - jobdata.MaxMemoryUsage = x - } - } - job_cpu := filepath.Join(cpuacct_base, userdir, jobdir, stepdir, "cpuacct.usage") - total_usage, err := os.ReadFile(job_cpu) - if err == nil { - tu, err := strconv.ParseInt(string(total_usage), 0, 64) - if err == nil { - job_cpu = filepath.Join(cpuacct_base, userdir, jobdir, stepdir, "cpuacct.usage_user") - user_usage, err := os.ReadFile(job_cpu) - if err == nil { - uu, err := strconv.ParseInt(string(user_usage), 0, 64) - if err == nil { - jobdata.CpuUsageUser = int64(uu/tu) * 100 - jobdata.CpuUsageSys = 100 - jobdata.CpuUsageUser - } - } - } - } - return jobdata, nil -} - -func pathinfo(path string) (uint64, uint64, string, error) { - uid := uint64(0) - jobid := uint64(0) - step := "" - - parts := strings.Split(path, "/") - for i := len(parts) - 1; i >= 0; i-- { - p := parts[i] - if strings.HasPrefix(p, "uid_") { - u, err := getId("uid", p) - if err == nil { - uid = u - } - } else if strings.HasPrefix(p, "job_") { - j, err := getId("job", p) - if err == nil { - jobid = j - } - } else if strings.HasPrefix(p, "step_") { - step = p[5:] - } - } - - return uid, jobid, step, nil -} - -func (m *SlurmJobDetector) CheckEvents(timestamp time.Time) { - globPattern := filepath.Join(cpuacct_base, "uid_[0-9]*", "job_[0-9]*") - if m.config.SendStepEvents { - globPattern = filepath.Join(cpuacct_base, "uid_[0-9]*", "job_[0-9]*", "step_*") - } - dirs, err := filepath.Glob(globPattern) - if err != nil { - cclog.ComponentError(m.name, "Cannot glob with pattern", globPattern) - return - } - for _, d := range dirs { - if _, ok := m.files[d]; !ok { - uid := uint64(0) - jobid := uint64(0) - step := "" - uid, jobid, step, err = pathinfo(d) - if err == nil { - if len(step) == 0 { - cclog.ComponentDebug(m.name, "New job for UID ", uid, " and JOBID ", jobid) - m.NewJobEvent(uint64(uid), uint64(jobid), timestamp, m.output) - } else { - cclog.ComponentDebug(m.name, "New job step for UID ", uid, ", JOBID ", jobid, " and step ", step) - m.NewJobStepEvent(uint64(uid), uint64(jobid), step, timestamp, m.output) - } - } - m.files[d] = struct{}{} - } - } - for d := range m.files { - if _, ok := stringArrayContains(dirs, d); !ok { - uid := uint64(0) - jobid := uint64(0) - step := "" - uid, jobid, step, err = pathinfo(d) - if err == nil { - if len(step) == 0 { - cclog.ComponentDebug(m.name, "Vanished job for UID ", uid, " and JOBID ", jobid) - m.EndJobEvent(uint64(uid), uint64(jobid), timestamp, m.output) - } else { - cclog.ComponentDebug(m.name, "Vanished job step for UID ", uid, ", JOBID ", jobid, " and step ", step) - m.EndJobStepEvent(uint64(uid), uint64(jobid), step, timestamp, m.output) - } - } - delete(m.files, d) - } - } -} - -func (m *SlurmJobDetector) NewJobEvent(uid, jobid uint64, timestamp time.Time, output chan lp.CCMetric) { - jobtags := map[string]string{ - "type": "job", - "type-id": fmt.Sprintf("%d", jobid), - } - userdir := fmt.Sprintf("uid_%d", uid) - jobdir := fmt.Sprintf("job_%d", jobid) - - // Fill job JSON with data from cgroup - var md SlurmJobMetadata - job_cpus_file := filepath.Join(cpuset_base, userdir, jobdir, "cpuset.effective_cpus") - cclog.ComponentDebug(m.name, job_cpus_file) - job_cpus, err := os.ReadFile(job_cpus_file) - if err == nil { - cclog.ComponentDebug(m.name, string(job_cpus)) - md.Cpus = ExpandList(string(job_cpus)) - } - job_mems_file := filepath.Join(cpuset_base, userdir, jobdir, "cpuset.effective_mems") - job_mems, err := os.ReadFile(job_mems_file) - if err == nil { - md.Memories = ExpandList(string(job_mems)) - } - job_devs_file := filepath.Join(devices_base, userdir, jobdir, "devices.list") - job_devs, err := os.ReadFile(job_devs_file) - if err == nil { - md.Devices = ParseDevices(string(job_devs)) - } - job_mem_limit_hard_file := filepath.Join(memory_base, userdir, jobdir, "memory.limit_in_bytes") - job_mem_limit_hard, err := os.ReadFile(job_mem_limit_hard_file) - if err == nil { - x, err := strconv.ParseInt(string(job_mem_limit_hard), 0, 64) - if err == nil { - md.MemoryLimitHard = uint64(x) - } - } - job_mem_limit_soft_file := filepath.Join(memory_base, userdir, jobdir, "memory.soft_limit_in_bytes") - job_mem_limit_soft, err := os.ReadFile(job_mem_limit_soft_file) - if err == nil { - x, err := strconv.ParseInt(string(job_mem_limit_soft), 0, 64) - if err == nil { - md.MemoryLimitSoft = uint64(x) - } - } - md.UID = uid - md.JobId = jobid - md.Timestamp = uint64(timestamp.Unix()) - md.Status = "start" - jobjson, err := json.Marshal(md) - if err == nil { - y, err := lp.New("slurm", jobtags, m.meta, map[string]interface{}{"value": string(jobjson)}, timestamp) - if err == nil { - suid := fmt.Sprintf("%d", uid) - y.AddMeta("uid", suid) - uname, err := osuser.LookupId(suid) - if err == nil { - y.AddMeta("username", uname.Username) - } - y.AddMeta("metric_type", "event") - output <- y - } - } -} - -func (m *SlurmJobDetector) NewJobStepEvent(uid, jobid uint64, step string, timestamp time.Time, output chan lp.CCMetric) { - jobtags := map[string]string{ - "type": "job", - "type-id": fmt.Sprintf("%d", jobid), - "stype": "step", - "stype-id": step, - } - userdir := fmt.Sprintf("uid_%d", uid) - jobdir := fmt.Sprintf("job_%d", jobid) - stepdir := fmt.Sprintf("step_%s", step) - - // Fill job JSON with data from cgroup - var md SlurmJobMetadata - job_cpus_file := filepath.Join(cpuset_base, userdir, jobdir, stepdir, "cpuset.effective_cpus") - job_cpus, err := os.ReadFile(job_cpus_file) - if err == nil { - md.Cpus = ExpandList(string(job_cpus)) - } - job_mems_file := filepath.Join(cpuset_base, userdir, jobdir, stepdir, "cpuset.effective_mems") - job_mems, err := os.ReadFile(job_mems_file) - if err == nil { - md.Memories = ExpandList(string(job_mems)) - } - job_devs_file := filepath.Join(devices_base, userdir, jobdir, stepdir, "devices.list") - job_devs, err := os.ReadFile(job_devs_file) - if err == nil { - md.Devices = ParseDevices(string(job_devs)) - } - job_mem_limit_hard_file := filepath.Join(memory_base, userdir, jobdir, stepdir, "memory.limit_in_bytes") - job_mem_limit_hard, err := os.ReadFile(job_mem_limit_hard_file) - if err == nil { - x, err := strconv.ParseInt(string(job_mem_limit_hard), 0, 64) - if err == nil { - md.MemoryLimitHard = uint64(x) - } - } - job_mem_limit_soft_file := filepath.Join(memory_base, userdir, jobdir, stepdir, "memory.soft_limit_in_bytes") - job_mem_limit_soft, err := os.ReadFile(job_mem_limit_soft_file) - if err == nil { - x, err := strconv.ParseInt(string(job_mem_limit_soft), 0, 64) - if err == nil { - md.MemoryLimitSoft = uint64(x) - } - } - md.UID = uid - md.JobId = jobid - md.Step = step - md.Timestamp = uint64(timestamp.Unix()) - md.Status = "start" - jobjson, err := json.Marshal(md) - if err == nil { - y, err := lp.New("slurm", jobtags, m.meta, map[string]interface{}{"value": string(jobjson)}, timestamp) - if err == nil { - suid := fmt.Sprintf("%d", uid) - y.AddMeta("uid", suid) - uname, err := osuser.LookupId(suid) - if err == nil { - y.AddMeta("username", uname.Username) - } - y.AddMeta("metric_type", "event") - output <- y - } - } -} - -func (m *SlurmJobDetector) EndJobEvent(uid, jobid uint64, timestamp time.Time, output chan lp.CCMetric) { - jobtags := map[string]string{ - "type": "job", - "type-id": fmt.Sprintf("%d", jobid), - } - - // Fill job JSON with data from cgroup - var md SlurmJobMetadata - md.UID = uid - md.JobId = jobid - md.Timestamp = uint64(timestamp.Unix()) - md.Status = "end" - jobjson, err := json.Marshal(md) - if err == nil { - y, err := lp.New("slurm", jobtags, m.meta, map[string]interface{}{"value": string(jobjson)}, timestamp) - if err == nil { - suid := fmt.Sprintf("%d", uid) - y.AddMeta("uid", suid) - uname, err := osuser.LookupId(suid) - if err == nil { - y.AddMeta("username", uname.Username) - } - y.AddMeta("metric_type", "event") - output <- y - } - } -} - -func (m *SlurmJobDetector) EndJobStepEvent(uid, jobid uint64, step string, timestamp time.Time, output chan lp.CCMetric) { - jobtags := map[string]string{ - "type": "job", - "type-id": fmt.Sprintf("%d", jobid), - "stype": "step", - "stype-id": step, - } - - // Fill job JSON with data from cgroup - var md SlurmJobMetadata - md.UID = uid - md.JobId = jobid - md.Step = step - md.Timestamp = uint64(timestamp.Unix()) - md.Status = "end" - jobjson, err := json.Marshal(md) - if err == nil { - y, err := lp.New("slurm", jobtags, m.meta, map[string]interface{}{"value": string(jobjson)}, timestamp) - if err == nil { - suid := fmt.Sprintf("%d", uid) - y.AddMeta("uid", suid) - uname, err := osuser.LookupId(suid) - if err == nil { - y.AddMeta("username", uname.Username) - } - y.AddMeta("metric_type", "event") - output <- y - } - } -} - -func (m *SlurmJobDetector) SendMetrics(jobtags map[string]string, jobmetrics SlurmJobMetrics, timestamp time.Time, output chan lp.CCMetric) { - - y, err := lp.New("mem_used", jobtags, m.meta, map[string]interface{}{"value": jobmetrics.MemoryUsage}, timestamp) - if err == nil { - y.AddMeta("unit", "Bytes") - output <- y - } - y, err = lp.New("max_mem_used", jobtags, m.meta, map[string]interface{}{"value": jobmetrics.MaxMemoryUsage}, timestamp) - if err == nil { - y.AddMeta("unit", "Bytes") - output <- y - } - y, err = lp.New("user_cpu", jobtags, m.meta, map[string]interface{}{"value": jobmetrics.CpuUsageUser}, timestamp) - if err == nil { - y.AddMeta("unit", "%") - output <- y - } - y, err = lp.New("user_sys", jobtags, m.meta, map[string]interface{}{"value": jobmetrics.CpuUsageSys}, timestamp) - if err == nil { - y.AddMeta("unit", "%") - output <- y - } -} - -// Read collects all metrics belonging to the sample collector +// Read collects all metrics belonging to the SlurmJobDetector collector // and sends them through the output channel to the collector manager func (m *SlurmJobDetector) Read(interval time.Duration, output chan lp.CCMetric) { // Create a sample metric timestamp := time.Now() - // Capture output channel + // Capture output channel for event sending in goroutine, so at startup, the event checking + // waits until the first call of Read() m.output = output - udirs, err := filepath.Glob(filepath.Join(cpuacct_base, "uid_[0-9]*")) + // This is the reading for metrics for all running jobs. For the event checking, check + // the goroutine in Init() + + parts := make([]string, 0) + parts = append(parts, cpuacct_base) + // Only cgroup/v1 has a uid_* folder + if m.config.CgroupVersion == "v1" { + parts = append(parts, "uid_[0-9]*") + } + parts = append(parts, "job_[0-9]*") + // Get folders based on constructed glob path + dirs, err := filepath.Glob(filepath.Join(parts...)) if err != nil { + cclog.ComponentError(m.name, "Cannot get directory list for SLURM jobs") return } - - for _, ud := range udirs { - jdirs, err := filepath.Glob(filepath.Join(ud, "job_[0-9]*")) + if m.config.SendStepEvents { + // Add step lookup if we process step events + parts = append(parts, "step_*") + // Get step folders based on constructed glob path + sdirs, err := filepath.Glob(filepath.Join(parts...)) if err != nil { - continue - } - uKey := filepath.Base(ud) - - for _, jd := range jdirs { - jKey := filepath.Base(jd) - jobid, err := getId("job", jKey) - if err != nil { - continue - } - jobmetrics, err := ReadJobData(uKey, jKey) - if err != nil { - jobtags := map[string]string{ - "type": "job", - "type-id": fmt.Sprintf("%d", jobid), - } - m.SendMetrics(jobtags, jobmetrics, timestamp, output) - } - if m.config.SendStepMetrics { - sdirs, err := filepath.Glob(filepath.Join(jd, "step_*")) - if err != nil { - continue - } - for _, sd := range sdirs { - sKey := filepath.Base(sd) - stepmetrics, err := ReadJobStepData(uKey, jKey, sKey) - if err != nil { - continue - } - var stepname string - _, err = fmt.Sscanf(sKey, "step_%s", &stepname) - if err == nil { - jobtags := map[string]string{ - "type": "job", - "type-id": fmt.Sprintf("%d", jobid), - "stype": "step", - "stype-id": stepname, - } - m.SendMetrics(jobtags, stepmetrics, timestamp, output) - } - } - - } + cclog.ComponentError(m.name, "Cannot get directory list for SLURM steps") + return } + // Add step folders to directory list for processsing + dirs = append(dirs, sdirs...) } - // uid_pattern := "uid_[0-9]*" - // job_pattern := "job_[0-9]*" - // //step_pattern := "step_*" + for _, d := range dirs { + dirParts := GetPathParts(d) // Gets uid_*, job_* and step_* (if available) + uid, jobid, step := GetIdsFromParts(dirParts) // extracts the IDs from the available parts - // globPattern := filepath.Join(cpuacct_base, uid_pattern) - // uidDirs, err := filepath.Glob(globPattern) - // if err != nil { - // return - // } - // for _, udir := range uidDirs { - // uKey := filepath.Base(udir) - // if _, ok := m.jobs[uKey]; !ok { - // m.jobs[uKey] = make(map[string]SlurmJobData) - // } - // uid, _ := getId("uid", uKey) - // globPattern = filepath.Join(cpuacct_base, uKey, job_pattern) - // jobDirs, err := filepath.Glob(globPattern) - // if err != nil { - // continue - // } - // for _, jdir := range jobDirs { - // jKey := filepath.Base(jdir) - // jobid, _ := getId("job", jKey) - // if _, ok := m.jobs[uKey][jKey]; !ok { - // var steps []SlurmJobStepData = nil - // if m.config.SendStepEvents || m.config.SendStepMetrics { - // steps = make([]SlurmJobStepData, 0) - // } - // m.jobs[uKey][jKey] = SlurmJobData{ - // Metrics: SlurmJobMetrics{ - // MemoryUsage: 0, - // MaxMemoryUsage: 0, - // LimitMemoryUsage: 0, - // CpuUsageUser: 0, - // CpuUsageSys: 0, - // }, - // Steps: steps, - // } - // m.NewJobEvent(uid, jobid, timestamp, output) - // } - // jdata := m.jobs[uKey][jKey] + // Create tags map for the job + jobtags := map[string]string{ + "type": "job", + "type-id": jobid, + } + // Create meta map for the job + jobmeta := make(map[string]string) - // jobmetrics, err := ReadJobData(uKey, jKey) - // if err == nil { - - // jdata.Metrics = jobmetrics - - // m.SendMetrics(jobid, jobmetrics, timestamp, output) - // } - // m.jobs[uKey][jKey] = jdata - // } - // } - - // for uKey, udata := range m.jobs { - // uid, _ := getId("uid", uKey) - // for jKey := range udata { - // jobid, _ := getId("job", jKey) - // p := filepath.Join(cpuset_base, uKey, jKey) - // if _, err := os.Stat(p); err != nil { - // m.EndJobEvent(uid, jobid, timestamp, output) - // delete(udata, jKey) - // } - // } - // p := filepath.Join(cpuset_base, uKey) - // if _, err := os.Stat(p); err != nil { - // delete(udata, uKey) - // } - // } + // if cgroup/v1, we have a uid + if len(uid) > 0 { + jobmeta["uid"] = uid + uname, err := osuser.LookupId(uid) + if err == nil { + jobmeta["username"] = uname.Username + } + } + // if this is a step directory, add the sub type with value + if len(step) > 0 { + jobtags["stype"] = "step" + jobtags["stype-id"] = step + } + jobmetrics, err := m.ReadMetrics(parts) + if err != nil { + // Send all metrics for the job + m.SendMetrics(jobtags, jobmeta, jobmetrics, timestamp, output) + } + } } // Close metric collector: close network connection, close files, close libraries, ... diff --git a/collectors/slurmJobDetector.md b/collectors/slurmJobDetector.md index 4315feb..d1819d2 100644 --- a/collectors/slurmJobDetector.md +++ b/collectors/slurmJobDetector.md @@ -7,6 +7,7 @@ "send_job_metrics" : true, "send_step_events": false, "send_step_metrics" : false, + "cgroup_version" : "v1" } ``` @@ -20,9 +21,13 @@ Options: * `send_job_metrics`: Send metrics of each running job with the global collector interval * `send_step_events`: Send events when a job step starts * `send_step_metrics`: Send metrics of each job step with the global collector interval +* `cgroup_version`: Which cgroup version is in use. Possible values are `v1` and `v2`. `v1` is the default +* `sysfs_base`: (Testing only) Set the base path for lookups, default `/sys/fs/cgroup`. + +For `cgroup_version = v2`, the collector searches for jobs at `/system.slice/slurmstepd.scope`, by default with `=/sys/fs/cgroup`. If the cgroup folders are created below `/sys/fs/cgroup/unified`, adjust the `sysfs_base` option to `/sys/fs/cgroup/unified`. ## Testing -For testing the collector, you can specifiy a different base directory that should be checked for new events. The default is `/sys/fs/cgroup/`. By specifying a `sysfs_base` in the configuration, this can be changed. Moreover, with the `slurmJobDetector_dummy.sh`, you can create and delete "jobs" for testing. Use the same directory with `--basedir` +For testing the collector, you can specifiy a different base directory that should be checked for new events. The default is `/sys/fs/cgroup/`. By specifying a `sysfs_base` in the configuration, this can be changed. Moreover, with the `slurmJobDetector_dummy.sh`, you can create and delete "jobs" for testing. Use the same directory with `--basedir`. It generates only cgroup/v1 directory structures at the moment. ```sh $ slurmJobDetector_dummy.sh -h