Source code for woom.job

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Job management utilities
"""

import datetime
import json
import logging
import os
import shlex
import subprocess
import time
from enum import Enum

import psutil

from . import util as wutil
from .__init__ import WoomError, woom_warn

# from .env import is_os_cmd_avail

ALLOWED_SCHEDULERS = ["background", "slurm", "pbspro"]

logger = logging.getLogger(__name__)


[docs] class WoomJobError(WoomError): pass
[docs] class JobStatus(Enum): SKIPPED = -8 TERMINATED = -7 FAILED = -6 ERROR = -5 SUCCESS = -4 KILLED = -3 NOTSUBMITTED = -2 FINISHED = -1 UNKNOWN = 0 PENDING = 1 RUNNING = 2 INQUEUE = 3 EXITING = 4 COMPLETING = 5
[docs] def is_running(self): return self.value > 0
[docs] def is_not_running(self): return self.value < 0
[docs] def is_unknown(self): return self.value == 0
[docs] def has_been_canceled(self): return self.name in ["KILLED", "TERMINATED"]
[docs] def has_failed(self): return self.name in ["ERROR", "FAILED", "KILLED"]
@property def jobid(self): if not hasattr(self, "_jobid"): self._jobid = "" return self._jobid @jobid.setter def jobid(self, jobid): self._jobid = str(jobid)
# %% Background processes
[docs] class Job: """Single job Parameters ---------- manager : JobManager Job manager instance name : str Job name script : str Path to job script args : list Command line arguments queue : str, optional Queue name jobid : str, optional Job identifier submission_date : str, optional Date of submission status : str or JobStatus, optional Job status subproc : subprocess.Popen, optional Subprocess object for background jobs artifacts : dict, optional Job artifacts submission_dir : str, optional Submission directory path blocking : bool, optional Whether job is blocking """ overview_format = dict( name="20", jobid="8", queue="10", realqueue="10", time="5", status="10", submission_date="20", # token="70", script="120", )
[docs] def __init__( self, manager, name, script, args, queue=None, jobid=None, submission_date=None, status="UNKNOWN", subproc=None, artifacts=None, submission_dir=None, blocking=True, ): self.manager = manager self.name = name self.queue = queue self.jobid = jobid self.script = script self.args = args self.realqueue = None self.time = None self.memory = None self.submission_date = submission_date if submission_date else str(datetime.datetime.now())[:-7] if script is not None and submission_dir is None: submission_dir = os.path.dirname(script) self.submission_dir = submission_dir self.subproc = subproc if isinstance(status, str): status = JobStatus[status] self.status = status self.status.jobid = jobid self.artifacts = artifacts self.blocking = blocking
[docs] @classmethod def load(cls, manager, json_file, append=True): """Load a job into a manager from a json file""" with open(json_file) as jsonf: content = json.load(jsonf) if manager.__class__.__name__ != content["manager"]: raise WoomJobError(f"Cannot load this job in a {manager.__class__.__name__} manager: {json_file}") job = cls( manager=manager, name=content["name"], script=content["script"], args=content["args"], jobid=content["jobid"], queue=content["queue"], status=content["status"], submission_date=content["submission_date"], artifacts=content.get("artifacts"), blocking=content.get("blocking", True), ) if append and content["jobid"] not in manager: manager.jobs.append(job) return job
[docs] def to_dict(self): dict_job = {} for key, value in self.__dict__.items(): if isinstance(value, str): if value != "": dict_job[key] = value elif key == "manager": dict_job[key] = value.__class__.__name__ elif key == "status": dict_job[key] = str(value.name) elif key == "time": if value is not None: hours = value.seconds // 3600 minutes = (value.seconds - hours * 3600) // 60 dict_job[key] = f"{hours:02}h{minutes:02}" else: dict_job[key] = "--h--" else: dict_job[key] = value return dict_job
[docs] def dump(self, json_file=None): """Export to json in job script directory""" jobdict = self.to_dict() if json_file is None: json_file = self.files["json"] with open(json_file, "w") as f: json.dump(jobdict, f, indent=4, cls=wutil.WoomJSONEncoder) json_path = f.name return json_path
def __str__(self): return self.jobid def __repr__(self): return "<Job(name={}, status={}, jobid={}, script={})>".format( self.name, self.status.name, self.jobid, self.script ) def __eq__(self, job): return str(self) == str(job) def __hash__(self): return hash(str(self)) @property def files(self): """:class:`dict` of job files like script, status, out, err and json""" submdir = os.path.dirname(self.script) return { "script": self.script, "status": os.path.join(submdir, "job.status"), "err": os.path.join(submdir, "job.err"), "out": os.path.join(submdir, "job.out"), "json": os.path.join(submdir, "job.json"), "terminating": os.path.join(submdir, "job.terminating"), } def _get_proc_(self): if isinstance(self.jobid, subprocess.Popen): pid = self.jobid.pid else: pid = self.jobid return psutil.Process(int(pid))
[docs] def query_status(self): """Query the status .. warning:: It does not update the status! It is just a query. """ try: proc = self._get_proc_() status = JobStatus.RUNNING status.jobid = str(proc.pid) except psutil.NoSuchProcess: status = JobStatus.UNKNOWN status.jobid = self.jobid return status
[docs] def get_status(self, fallback=None): """Query and set the status of this job""" if self.status.has_been_canceled(): # don't query in this case return self.status return self.set_status(self.query_status(), fallback=fallback)
[docs] def set_status(self, status, fallback=None): """Set the status of this job without query Don't update with unknown state if the job is supposed to be finished. """ if isinstance(status, str): status = JobStatus[status.upper()] if isinstance(status, JobStatus): status = status status.jobid = self.jobid else: # dict assert self.jobid == status["jobid"] dstatus = status self.realqueue = dstatus["queue"] status = dstatus["status"] status.jobid = dstatus["jobid"] self.time = dstatus["time"] status.jobid = self.jobid if status.is_unknown() and self.status.is_not_running(): return self.status self.status = status self.dump() return self.status
[docs] def is_running(self): try: p = self._get_proc_() return p.is_running() except psutil.NoSuchProcess: return False
[docs] def kill(self, graceful=True, timeout=10): """Kill the job, optionally trying graceful termination first Parameters ---------- graceful: bool If True, send SIGTERM first and wait for graceful shutdown timeout: int Seconds to wait for graceful shutdown before forcing SIGKILL """ if self.is_running(): proc = self._get_proc_() if graceful: # Try graceful termination first logger.debug(f"Sending SIGTERM to process: {proc.pid}") proc.terminate() # Sends SIGTERM try: # Wait for the process to terminate gracefully proc.wait(timeout=timeout) logger.debug(f"Process {proc.pid} terminated gracefully") # The script's on_term handler wrote status 0 self.set_status("SUCCESS") return except psutil.TimeoutExpired: logger.warning(f"Process {proc.pid} did not terminate gracefully, forcing kill") # Force kill with SIGKILL proc.kill() # Since SIGKILL can't be trapped, we must write the status file ourselves status_file = self.files["status"] exit_code = "0" if graceful else "1" logger.debug(f"Writing exit status to {status_file}: {exit_code}") with open(status_file, 'w') as f: f.write(exit_code) self.set_status("TERMINATED" if graceful else "KILLED")
[docs] def wait(self): """Wait for a job to finish""" if self.is_running(): p = self._get_proc_() logger.debug(f"Waiting for process to finish: {p.pid}") exit_status = p.wait() if exit_status: logger.error(f"Finished with exit status: {exit_status}") else: logger.debug("Ok, finished!") return exit_status
[docs] @classmethod def get_overview_header(cls): heads = [] tails = [] for name, fmt in cls.overview_format.items(): name = name.upper() heads.append(f"{name:{fmt}}") tails.append("-" * len(heads[-1])) return " ".join(heads) + "\n" + " ".join(tails)
[docs] def get_overview(self, update=True): # TODO: Must be converted to use pandas if update: self.update_status() name = self.name jobid = self.jobid queue = self.queue realqueue = self.realqueue status = self.status.name submission_date = self.submission_date if self.time is not None: hours = self.time.seconds // 3600 minutes = (self.time.seconds - hours * 3600) // 60 time = f"{hours:02}h{minutes:02}" else: time = "--h--" fmt = " ".join([f"{key!s:{ff}}" for key, ff in self.overview_format.items()]) return fmt.format(**locals())
[docs] class BackgroundJobManager(object): """Manager for jobs that run in background""" with_scheduler = False commands = { "submit": { "command": "bash", "options": { "script": "{}", "log_out": "-o {}", }, }, } status_names = { "F": JobStatus.FINISHED, } job_class = Job
[docs] def __init__(self): self.jobs = [] logger.info(f"Started job manager: {self.__class__.__name__}()")
[docs] def load_job(self, json_file, append=True): """Load a single job from its json dump file""" return self.job_class.load(self, json_file, append)
[docs] def load(self, json_files): """Load jobs from json dump files""" for json_file in json_files: self.load_job(json_file, append=True)
[docs] def dump(self): """Store jobs to json files""" for job in self.jobs: job.dump()
# def __repr__(self): # return f"<{self.__class__.__name__}>"
[docs] @staticmethod def from_scheduler(scheduler): scheduler = scheduler.lower() assert scheduler in ALLOWED_SCHEDULERS, ( f"Invalid scheduler: {scheduler}. Valid: {ALLOWED_SCHEDULERS}" ) cls_name = scheduler.title() + "JobManager" from . import job return getattr(job, cls_name)()
[docs] def get_job(self, jobid): """Get :class:`Job` from id""" jobid = str(jobid) if jobid is None: return for job in self.jobs: if job.jobid == jobid: return job
[docs] def get_jobs(self, jobids=None, name=None, queue=None): """Get job ids Parameters ---------- jobids: list(str), str, None Explicit list of job ids. `name` and `queue` are ignored when `jobids` is used. name: str, None Select jobs from name queue: str, None Select jobs from queue Return ------ list(Job) List of :class:`Job` objects """ jobs = [] if jobids: if not isinstance(jobids, list): jobids = [jobids] for job in self.jobs: for jobid in jobids: if job.id == str(jobid): jobs.append(job) elif name: for job in self.jobs: if job.name.lower() == name.lower(): jobs.append(job) elif queue: for job in self.jobs: if (name is not None and job.name != name) or (queue is not None and job.queue != queue): continue jobs.append(job) else: jobs = list(self.jobs) return jobs
[docs] def get_status(self, jobids=None, name=None, queue=None, fallback=None): """Update and return jobs status Return ------ list(Job) """ jobs = self.get_jobs(jobids=jobids, name=name, queue=queue) return [job.get_status(fallack=fallback) for job in jobs]
[docs] def set_status(self, jobids=None, name=None, queue=None, fallback=None): """Query status""" jobs = self.get_jobs(jobids=jobids, name=name, queue=queue) return [job.set_status(fallback=fallback) for job in jobs]
[docs] def get_overview(self, jobids=None, name=None, queue=None): jobs = self.update_status(jobids=jobids, name=name, queue=queue) header = Job.get_overview_header() overviews = [job.get_overview(update=False) for job in jobs] return header + "\n" + "\n".join(overviews)
[docs] def check_status(self, show=True): """Update jobs status and show them""" overview = self.get_overview() if show: print(overview)
def __contains__(self, job): return self.get_job(job) is not None def __getitem__(self, jobid): return self.get_job(jobid)
[docs] def drop(self, job): for j in list(self.jobs): if j == job: self.jobs.remove(j) return raise WoomJobError(f"Can't drop job from manager: {job}")
def __delitem__(self, jobid): self.drop(jobid) def __str__(self): # return self.get_overview() return self.__class__.__name__ def __repr__(self): return f"<{self}"
[docs] @classmethod def get_command_args(cls, command, **opts): """Convert commandline specifcations and values to a list of arguments Parameters ---------- command: str A valid key of the :attr:`commands` dictionary attribute kwargs: dict Dictionary to fill patterns defined in the :attr:`commands` attribute. Return ------ list """ args = [] if "command" in cls.commands[command]: args.append(cls.commands[command]["command"]) if "options" in cls.commands[command]: for oname, ovalue in opts.items(): if oname in cls.commands[command]["options"]: if ovalue is not None: fmt = cls.commands[command]["options"][oname] if isinstance(ovalue, bool): args += shlex.split(fmt) elif isinstance(ovalue, list): ovalue = [val for val in ovalue if val] for val in ovalue: args += shlex.split(fmt.format(val)) else: fmt = shlex.split(fmt.format(ovalue)) args += fmt return args
[docs] def get_submission_command(self, script, opts, depend=None): # Finalize options opts.update(dict(script=script)) # if depend: # if isinstance(depend, str): # depend = [depend] # opts["depend"] = ":".join(depend) if "extra " in opts: woom_warn("The 'extra' submission option is deprecated", "deprecation") opts.update(opts.pop("extra")) # Format commandline arguments return self.get_command_args("submit", **opts)
[docs] def create_job(self, script=None, name=None, args=[], fake=False, **kwargs): """Quickly create a job instance and add it to the manager""" job = self.job_class(manager=self, script=script, name=name, args=args, **kwargs) self.jobs.append(job) return job
[docs] def submit( self, script, opts, depend=None, submdir=None, stdout=None, stderr=None, artifacts=None, blocking=True ): # Wait for dependencies if depend: status = None for job in depend: status = job.wait() if status: logger.error(f"Can't submit job because one of the parent job failed: {job}") return # Get submission arguments jobargs = self.get_submission_command(script, opts, depend=depend) # Submission directory = where the script is if submdir is None: submdir = os.path.dirname(script) # stdout and stderr rootname = os.path.splitext(script)[0] _owned_stdout = stdout is None _owned_stderr = stderr is None if stdout is None: stdout = open(f"{rootname}.out", "w") if stderr is None: stderr = open(f"{rootname}.err", "w") # Submit logger.debug("Submit: " + " ".join(jobargs)) subproc = subprocess.Popen(jobargs, stdout=stdout, stderr=stderr, cwd=submdir) if _owned_stdout: stdout.close() if _owned_stderr: stderr.close() logger.debug("Submitted") # Init Job instance job = self.create_job( script=script, name=opts.get("name"), queue=opts.get("queue"), args=subproc.args, jobid=str(subproc.pid), subproc=subproc, artifacts=artifacts, submission_dir=submdir, blocking=blocking, ) job.dump() return job
def _parse_status_res_(self, res): if res.stderr: logger.debug("Job status stderr: " + res.stderr.decode("utf-8", errors="ignore")) if res.stdout: logger.debug("Job status stdout: " + res.stdout.decode("utf-8", errors="ignore")) return res.stdout.decode("utf-8", errors="ignore")
[docs] def kill(self, jobids=None, name=None, queue=None): for job in self.get_jobs(jobids=jobids, name=name, queue=queue): job.kill()
# cond = input("Do you really want to delete the jobs listed hereabove ?(yes/no)") # if cond == "yes": # for job in self.jobs: # job.kill() delete = kill
# %% With scheduler
[docs] class ScheduledJob(Job):
[docs] def query_status(self): """Query status for a single job First tries the active jobs queue, then falls back to history if available. """ args = self.manager._extra_status_args_(self.manager.get_command_args("status", jobid=self.jobid)) logger.debug("Get status: " + " ".join(args)) # check=False: squeue exits with code 1 when the job is no longer in the active # queue (completed/failed). We must not raise — fall through to sacct instead. res = subprocess.run(args, capture_output=True, check=False) logger.debug("Got status") # Parse active jobs (only if the scheduler command succeeded) if not res.returncode: status_list = self.manager._parse_status_res_(res) if status_list: return status_list[0]["status"] # Fallback to history if job not in active queue if hasattr(self.manager, '_query_history_status_'): logger.debug("Job not in active queue, checking history") return self.manager._query_history_status_(self.jobid) return JobStatus.UNKNOWN
[docs] def is_running(self): return self.get_status().is_running()
[docs] def wait(self): pass
[docs] def kill(self, graceful=True, timeout=10): """Kill the job using scheduler commands Parameters ---------- graceful: bool If True, send SIGTERM and wait before forcing SIGKILL timeout: int Seconds to wait for graceful shutdown before forcing """ if graceful: # Send SIGTERM using scheduler command logger.debug(f"Sending SIGTERM to job: {self.jobid}") args = self.manager.get_command_args("delete", force=False, terminate=True, jobid=self.jobid) res = subprocess.run(args, capture_output=True) if res.returncode == 0: # Poll status to wait for graceful termination start_time = time.time() while time.time() - start_time < timeout: status = self.query_status() if status.is_not_running(): logger.debug(f"Job {self.jobid} terminated gracefully") # Let get_task_status in workflow.py read the status file return time.sleep(1) logger.warning(f"Job {self.jobid} did not terminate gracefully, forcing kill") # Force kill with SIGKILL logger.debug(f"Forcing kill of job: {self.jobid}") args = self.manager.get_command_args("delete", force=True, terminate=False, jobid=self.jobid) res = subprocess.run(args, capture_output=True, check=True) if res.returncode == 0: # SIGKILL won't trigger bash handlers, write status ourselves status_file = self.files["status"] exit_code = "0" if graceful else "1" logger.debug(f"Writing exit status to {status_file}: {exit_code}") with open(status_file, 'w') as f: f.write(exit_code) self.set_status("TERMINATED" if graceful else "KILLED")
cancel = kill
class _Scheduler_(BackgroundJobManager): job_class = ScheduledJob with_scheduler = True def get_submission_command(self, script, opts, depend=None): if depend: opts["depend"] = ":".join([str(job) for job in depend]) return super().get_submission_command(script, opts, depend=depend) def submit( self, script, opts, depend=None, submdir=None, stdout=None, stderr=None, artifacts=None, blocking=True ): """Submit the script and instantiate a :class:`Job` object""" # stdout and stderr # Only set log paths if not already provided by the caller. # Using setdefault() preserves absolute paths passed via opts; unconditionally # overwriting them caused sbatch to resolve relative names against an unexpected # CWD and create spurious log directories. rootname = os.path.splitext(os.path.basename(script))[0] opts.setdefault("log_out", stdout or f"{rootname}.out") opts.setdefault("log_err", stderr or f"{rootname}.err") # Submision job = super().submit( script, opts, depend=depend, submdir=submdir, stdout=subprocess.PIPE, stderr=subprocess.PIPE, artifacts=artifacts, blocking=blocking, ) job.subproc.wait() # Post-proc stdout = job.subproc.stdout.read().decode("utf-8", errors="ignore") stderr = job.subproc.stderr.read().decode("utf-8", errors="ignore") logger.debug("Job submit stdout: " + stdout) logger.debug("Job submit stderr: " + stderr) if job.subproc.returncode: raise WoomJobError(f"Submission failed with error message: {stderr}") self._parse_submit_job_(job, stdout) # update jobid job.dump() return job def _parse_status_res_(self, res): if res.stderr: logger.debug("Job status stderr: " + res.stderr.decode("utf-8", errors="ignore")) if res.stdout: logger.debug("Job status stdout: " + res.stdout.decode("utf-8", errors="ignore")) return res.stdout.decode("utf-8", errors="ignore")
[docs] class PbsproJobManager(_Scheduler_): """Pbspro Job Manager""" commands = { "submit": { "command": "qsub", "options": { "script": "{}", "name": "-N {}", "queue": "-V -q {}", "nnodes": "-l select={}", "ncpus": "-l ncpus={}", "ngpus": "-l ngpus={}", "memory": "-l mem={}", "pmem": "-l pmem={}", "time": "-l walltime={}", "log_out": "-o {}", "log_err": "-e {}", "depend": ("-W depend=afterok:{}"), "mail": "-M {}", "extra": "{}", }, }, "status": { "command": "qstat", "options": { "jobid": "{}", "logname": "-u $LOGNAME", }, }, "delete": { "command": "qdel", "options": { "force": "-W force", "jobid": "{}", }, }, } status_names = { "R": JobStatus.RUNNING, "F": JobStatus.FINISHED, "E": JobStatus.EXITING, "Q": JobStatus.INQUEUE, "H": JobStatus.PENDING, "C": JobStatus.SUCCESS, # Completed "X": JobStatus.SUCCESS, # Finished successfully } jobid_sep = " " @staticmethod def _parse_submit_job_(job, stdout): # job.jobid = job.subproc.stdout.read().decode("utf-8", errors="ignore").split(".")[0] job.jobid = stdout.split(".")[0] job.status.jobid = job.jobid # Keys in the submit opts dict that are not qsub flags _non_scheduler_keys = frozenset({"blocking"})
[docs] def get_submission_command(self, script, opts, depend=None): """Build the qsub command, with two PBS Pro-specific behaviours: 1. If ``nnodes`` is provided, ``ncpus``, ``memory`` and ``pmem`` are merged into the ``-l select=`` directive (PBS Pro forbids these as standalone ``-l`` directives alongside ``select``). 2. Any key in *opts* that is not in the known options dict (i.e. keys added via ``__many__`` in ``tasks.ini``) is treated as a raw qsub flag string and inserted verbatim before the script path. """ # 1. Merge nnodes + ncpus + memory + pmem into a single select directive. # Values may arrive as strings when Jinja2 templates are used in tasks.cfg. nnodes = opts.get("nnodes") ncpus = opts.pop("ncpus", None) memory = opts.pop("memory", None) pmem = opts.pop("pmem", None) if nnodes is not None: nnodes = int(nnodes) if ncpus is not None: ncpus = int(ncpus) if nnodes is not None: select = str(nnodes) if ncpus is not None: select += f":ncpus={ncpus}:mpiprocs={ncpus}" if memory is not None: select += f":mem={memory}" if pmem is not None: select += f":pmem={pmem}" opts["nnodes"] = select # 2. Extract __many__ keys: unknown to the scheduler, not internal known = set(self.commands["submit"]["options"]) | self._non_scheduler_keys extra_raw = [] for key in list(opts.keys()): if key not in known: val = opts.pop(key) if val is not None: extra_raw += shlex.split(str(val)) # Build the base command (script path will be last) cmd = super().get_submission_command(script, opts, depend=depend) # Insert extra raw args just before the script path if extra_raw: cmd = cmd[:-1] + extra_raw + [cmd[-1]] return cmd
def _extra_status_args_(self, args): "useful?" args.append("-x") args.append("-u $LOGNAME") return args def _parse_status_res_(self, res): """JOBID PARTITION NAME USER ST TIME NODES NODELIST(REASON)""" res = super()._parse_status_res_(res) lines = res.splitlines()[5:] out = [] for line in lines: ( jobid, user, queue, name, _, # session? nodes, task, mem, time, status, elaptime, ) = line.split() if (elaptime == "--:--") or elaptime == "--": elaptime = None else: hms = elaptime.split(":") hh = 0 mm = 0 ss = 0 if len(hms) == 1: ss = hms[0] elif len(hms) == 2: hh, mm = hms else: hh, mm, ss = hms elaptime = datetime.timedelta(seconds=int(ss), minutes=int(mm), hours=int(hh)) jobid = jobid.split(".")[0] if status in self.status_names: status = self.status_names[status] else: status = JobStatus.UNKNOWN status.jobid = jobid out.append( { "jobid": jobid, "queue": queue, "name": name, "time": elaptime, "status": status, } ) return out
[docs] @staticmethod def get_killed(content): """Check the terminated status from job output""" if "PBS: job killed: walltime" in content and "Terminated" in content: status = JobStatus["FAILED"] return status
[docs] class SlurmJobManager(_Scheduler_): """Slurm Job Manager""" commands = { "submit": { "command": "sbatch", "options": { "name": "-J {}", "queue": "-p {}", "nnodes": "-N {}", "ncpus": "-c {}", "ngpus": "--gpus={}", "mem": "--mem={}", "pmem": "--mem-per-cpu={0} --mem-per-gpu={0}", "time": "--time={}", "depend": "--dependency=afterok:{} --kill-on-invalid-dep=yes", "log_out": "-o {}", "log_err": "-e {}", "script": "{}", "mail": "--mail-type=ALL --mail-user={}", }, }, "status": { "command": "squeue", "options": { "jobid": "--jobs={}", "queue": "--partition={}", "name": "--name={}", "users": "--users={}", "noheader": "--noheader", }, }, "history": { "command": "sacct", "options": { "jobid": "-j {}", "format": "--format=JobID,State,Elapsed", "parsable": "--parsable2", "noheader": "--noheader", }, }, "delete": { "command": "scancel", "options": { "jobid": "{}", "force": "--signal=KILL", "terminate": "--signal=TERM", }, }, } status_names = { "R": JobStatus.RUNNING, "CD": JobStatus.FINISHED, "PD": JobStatus.PENDING, "CG": JobStatus.COMPLETING, "COMPLETED": JobStatus.SUCCESS, "FAILED": JobStatus.FAILED, "CANCELLED": JobStatus.KILLED, "TIMEOUT": JobStatus.FAILED, "OUT_OF_MEMORY": JobStatus.FAILED, "NODE_FAIL": JobStatus.FAILED, "DEADLINE": JobStatus.FAILED, } # sacct returns full-word state names (different from squeue short codes). history_status_names = { "COMPLETED": JobStatus.SUCCESS, "FAILED": JobStatus.FAILED, "CANCELLED": JobStatus.KILLED, "TIMEOUT": JobStatus.FAILED, "OUT_OF_MEMORY": JobStatus.FAILED, "NODE_FAIL": JobStatus.FAILED, "BOOT_FAIL": JobStatus.FAILED, "DEADLINE": JobStatus.FAILED, "PREEMPTED": JobStatus.FAILED, "RUNNING": JobStatus.RUNNING, "PENDING": JobStatus.PENDING, "SUSPENDED": JobStatus.PENDING, } jobid_sep = "," def _extra_status_args_(self, args): args.append("--noheader") return args @staticmethod def _parse_submit_job_(job, stdout): job.jobid = stdout.split()[-1] job.status.jobid = job.jobid # job.jobid = job.subproc.stdout.read().decode("utf-8", errors="ignore").split()[-1] def _parse_status_res_(self, res): """JOBID PARTITION NAME USER ST TIME NODES NODELIST(REASON)""" res = super()._parse_status_res_(res) out = [] lines = res.splitlines() if lines: for line in lines: info = line.split() jobid = info[0] queue = info[1] name = info[2] _ = info[3] # user status = info[4] time = info[5] _ = info[6] # nodes _ = " ".join(info[7:]) # nodelist hms = time.split(":") hh = 0 mm = 0 if len(hms) == 1: ss = hms[0] elif len(hms) == 2: mm, ss = hms else: hh, mm, ss = hms time = datetime.timedelta(seconds=int(mm), minutes=int(hh)) if status in self.status_names: status = self.status_names[status] else: status = JobStatus.UNKNOWN status.jobid = jobid out.append( { "jobid": jobid, "queue": queue, "name": name, "time": time, "status": status, } ) return out def _query_history_status_(self, jobid): """Query job status from sacct history Parameters ---------- jobid : str Job ID to query Returns ------- JobStatus Status from history or UNKNOWN if not found """ args = self.get_command_args( "history", jobid=jobid, format=True, parsable=True, noheader=True, ) logger.debug("Query history: " + " ".join(args)) try: res = subprocess.run(args, capture_output=True, check=True, timeout=5) if res.returncode or not res.stdout: return JobStatus.UNKNOWN # Parse: JobID|State|Elapsed lines = res.stdout.decode("utf-8", errors="ignore").strip().split("\n") if not lines or not lines[0]: return JobStatus.UNKNOWN # Take first line (main job, not job steps like 12345.0) for line in lines: if not line or '.batch' in line or '.extern' in line: continue parts = line.split("|") if len(parts) >= 2: # Extract just the job ID without steps line_jobid = parts[0].split(".")[0] if line_jobid != str(jobid): continue # Remove details like "CANCELLED by 12345" state = parts[1].split()[0] status = self.history_status_names.get(state, JobStatus.FINISHED) status.jobid = jobid logger.debug(f"Found job {jobid} in history with state: {state}") return status except subprocess.TimeoutExpired: logger.warning(f"Timeout querying job history for {jobid}") except subprocess.CalledProcessError as e: logger.warning(f"Failed to query job history for {jobid}: {e}") except Exception as e: logger.warning(f"Error parsing sacct output for {jobid}: {e}") return JobStatus.UNKNOWN
[docs] @staticmethod def get_killed(content): """Check the terminated status from job output""" if ("DUE TO TIME LIMIT" in content or "CANCELLED AT" in content) or ( "OUT OF MEMORY" in content or ("slurmstepd: error:" in content and "Killed process" in content) ): status = JobStatus["FAILED"] return status