Source code for woom.tasks

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Task manager
"""

import functools
import os
import re

import configobj

from . import conf as wconf
from . import render as wrender
from .__init__ import WoomError

CFGSPECS_FILE = os.path.join(os.path.dirname(__file__), "tasks.ini")
RE_SPLIT_COMMAS = re.compile(r"\s*,\s*").split


[docs] class TaskError(WoomError): pass
[docs] class TaskTree: """Postprocess configuration to build a task tree"""
[docs] def __init__(self, stages, groups=None): """ Parameters ---------- stages: :class:`configobj.Section` groups: None, :class:`configobj.Section` Group of tasks that can be used in stages """ self._stages = configobj.ConfigObj(stages) self._groups = configobj.ConfigObj(groups)
[docs] @functools.cache def to_dict(self): all_tasks = [] tt = {} for stage in self._stages.sections: # prolog, tokens, epilog tt[stage] = {} # Loop on sub-stages for substage, tasks_line in self._stages[stage].items(): # fetch=task1,group1,... tasks = tasks_line # RE_SPLIT_COMMAS(tasks_line) tt[stage][substage] = tasks # Loop on parallel tasks for i in range(len(tasks)): task = tasks[i] if task in self._groups: tasks[i] = self._groups[task] # RE_SPLIT_COMMAS(self._groups[task]) else: tasks[i] = [task] # as a single element group # Check unicity for task in tasks[i]: if task in all_tasks: raise TaskError(f"Duplicate tasks not allowed: {task}") all_tasks.append(task) return tt
def __str__(self): dd = self.to_dict() ss = "" for stage, scontent in dd.items(): if not scontent: continue ss += f"{stage}:\n" for substage, sscontent in scontent.items(): ss += f" - {substage}: " tasks = [] for gt in sscontent: if len(gt) == 1: tasks.append(gt[0]) else: tasks.append("[" + " -> ".join(gt) + "]") ss += " // ".join(tasks) + "\n" # ss += "\n" if not ss: ss = "Empty workflow!" return ss.strip("\n")
[docs] class TaskManager:
[docs] def __init__(self, host): # , session): self._configs = [] self._config = configobj.ConfigObj(interpolation=False) self._host = host
[docs] def load_config(self, cfgfile): cfg = wconf.load_cfg(cfgfile, CFGSPECS_FILE, list_values=False) self._configs.append(cfg) self._postproc_()
def _postproc_(self): if self._configs: # Merge for cfg in self._configs: self._config.merge(cfg) # Apply inheritance not_complete = True while not_complete: not_complete = False for name, content in self._config.items(): if "inherit" in content.scalars: inherit = content["inherit"] if inherit: if inherit in self._config: wconf.inherit_cfg(self._config[name], self._config[inherit]) if self._config[name]["inherit"] == inherit: self._config[name]["inherit"] = None else: not_complete = True else: raise TaskError(f"Wrong task name to inherit from: {inherit}") @property def host(self): return self._host
[docs] def get_task(self, name): """Get a :class:`Task` instance Parameters ---------- name: str Known task name Return ------ Task """ if name not in self._config: raise TaskError(f"Invalid task name: {name}") # Create instance return Task(self._config[name], self.host)
[docs] class Task:
[docs] def __init__(self, taskconfig, host): self._config = taskconfig self._host = host
@property def config(self): """The task configuration as loaded from the :file:`tasks.cfg` (:class:`~configobj.ConfigObj`)""" return self._config @property def host(self): """The current :class:`~woom.hosts.Host` instance (:class:`~woom.hosts.Host`)""" return self._host @property def name(self): """The task name (:class:`str`)""" return self.config.name @functools.cached_property def env(self): """Instance of :class:`woom.env.EnvConfig` specific to this task (:class:`~woom.env.EnvConfig`)""" # Get env from name, possibly empty env = self.host.get_env(self.config["content"]["env"]).copy() # Add woom variables env.vars_set.update(WOOM_TASK_NAME=self.name) if self.run_dir: env.vars_set.update(WOOM_RUN_DIR=self.run_dir) return env @property def artifacts(self): """The artifacts as a dict of file names (:class:`dict`)""" return self.config["artifacts"]
[docs] def get_run_dir(self): """Get the run directory""" run_dir = self.config["content"]["run_dir"] if run_dir is None: return "" if run_dir == "current": run_dir = os.getcwd() return run_dir.strip()
@functools.cached_property def run_dir(self): return self.get_run_dir()
[docs] def export_env(self, params=None): """Export the environment declarations as bash lines""" return self.env.render(params)
[docs] def export_run_dir(self): """Export the bash lines to move to the running directory""" if self.run_dir: return f"mkdir -p {self.run_dir} && cd {self.run_dir}" return ""
[docs] def export_commandline(self): """Export the commandline as an bash lines""" return self.config["content"]["commandline"]
[docs] def export_artifacts_checking(self): """Export commandlines to check the existence of artifacts""" if not self.artifacts: return "" checks = "" for name, path in self.artifacts.items(): checks += 'test -f "' + path + '" || { echo artifact ' + name + '="' + path + '"; exit 1; }\n' return checks
[docs] def render_artifacts(self, params): """Check that artifact paths are absolute and render them as dict""" if not self.artifacts: return {} artifacts = {} for name, path in self.config["artifacts"].items(): rendered = wrender.render(path.strip(), params) if not os.path.isabs(path): if self.run_dir: rendered = os.path.join(self.run_dir, rendered) else: raise TaskError( f"Rendered artifact '{name}' of task '{self.name}' is not absolute " "and task run_dir is not defined. Please fix it!" ) artifacts[name] = rendered return artifacts
[docs] def render_content(self, params): """Render the task content with jinja Rendering uses parameters and the :ref:`job.sh template <templates.job.sh>` Parameters ---------- params: dict Parameters used for substitution Return ------ str """ params = params.copy() params["params"] = params return wrender.render(wrender.JINJA_ENV.get_template("job.sh"), params)
[docs] def export_scheduler_options(self): """Export a dict of scheduler options Returns ------- dict """ if not self.host["scheduler"]: return {} opts = { "memory": self.config["submit"]["memory"], "time": self.config["submit"]["time"], "mail": self.config["submit"]["mail"], # "log_out": self.config["submit"]["log_out"], "extra": self.config["submit"]["extra"].dict(), } if self.config["submit"]["queue"]: opts["queue"] = self.host["queues"][self.config["submit"]["queue"]] return opts
[docs] def export(self, params): return { "script_content": self.render_content(params), "scheduler_options": self.export_scheduler_options(), "artifacts": self.render_artifacts(params), }