Source code for woom.tasks

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Task manager
"""

import functools
import logging
import os
import re

import configobj

from . import conf as wconf
from . import render as wrender
from . import util as wutil
from .__init__ import WoomError

thisdir = os.path.dirname(__file__)

#: Specifications file for tasks configuration
CFGSPECS_FILE = os.path.join(thisdir, "tasks.ini")

RE_SPLIT_COMMAS = re.compile(r"\s*,\s*").split

#: Default tasks configuration
CFG_DEFAULT_FILE = os.path.join(thisdir, "tasks.cfg")


#: Functions that generate artifact paths taking a context as first argument
ARTIFACTS_GENERATORS = {}


[docs] class TaskError(WoomError): pass
[docs] class TaskTree: """Postprocess configuration to build a task tree"""
[docs] def __init__(self, stages, groups=None): """Initialize a task tree Parameters ---------- stages : configobj.Section Workflow stages configuration groups : configobj.Section, optional Group of tasks that can be used in stages """ self._stages = configobj.ConfigObj(stages) self._groups = configobj.ConfigObj(groups)
[docs] @functools.cache def to_dict(self): all_tasks = [] tt = {} for stage in self._stages.sections: # prolog, tokens, epilog tt[stage] = {} # Loop on sub-stages for substage, tasks_line in self._stages[stage].items(): # fetch=task1,group1,... tasks = tasks_line # RE_SPLIT_COMMAS(tasks_line) tt[stage][substage] = tasks # Loop on parallel tasks for i in range(len(tasks)): task = tasks[i] if task in self._groups: tasks[i] = self._groups[task] # RE_SPLIT_COMMAS(self._groups[task]) else: tasks[i] = [task] # as a single element group # Check unicity for task in tasks[i]: if task in all_tasks: raise TaskError(f"Duplicate tasks not allowed: {task}") all_tasks.append(task) return tt
def __str__(self): dd = self.to_dict() ss = "" for stage, scontent in dd.items(): if not scontent: continue ss += f"{stage}:\n" for substage, sscontent in scontent.items(): ss += f" - {substage}: " tasks = [] for gt in sscontent: if len(gt) == 1: tasks.append(gt[0]) else: tasks.append("[" + " -> ".join(gt) + "]") ss += " // ".join(tasks) + "\n" # ss += "\n" if not ss: ss = "Empty workflow!" return ss.strip("\n")
[docs] def get_task_stage(self, task_name): """Get the stage name of task It can be either "prolog", "cycles", "epilog" or None """ dd = self.to_dict() for stage, scontent in dd.items(): if not scontent: continue for substage, sscontent in scontent.items(): for gt in sscontent: if task_name in gt: return stage
[docs] class TaskManager: """Manager of :class:`Task` instances"""
[docs] def __init__(self, host): """Initialize a task manager Parameters ---------- host : Host Host instance """ self._configs = [] self._config = wconf.load_cfg( CFG_DEFAULT_FILE, CFGSPECS_FILE, interpolation=False, list_values=True, ) self._config_files = [CFG_DEFAULT_FILE] self._host = host self.logger = logging.getLogger(__name__)
# self._config_files = []
[docs] def load_config(self, cfgfile): """Load a user configuration file .. note:: It is merged with the current one Parameters ---------- cfgfile: str A valid config file Return ------ configobj.ConfigObj """ cfgfile = os.path.abspath(cfgfile) cfg = wconf.load_cfg(cfgfile, CFGSPECS_FILE, interpolation=False, list_values=True) self._config_files.append(cfgfile) self._configs.append(cfg) self._postproc_()
def _postproc_(self): if self._configs: # Merge for cfg in self._configs: self._config.merge(cfg) # Apply inheritance not_complete = True while not_complete: not_complete = False for name, content in self._config.items(): if "inherit" in content.scalars: inherit = content["inherit"] if inherit: if inherit in self._config: wconf.inherit_cfg(self._config[name], self._config[inherit]) if self._config[name]["inherit"] == inherit: self._config[name]["inherit"] = None else: not_complete = True else: raise TaskError(f"Wrong task name to inherit from: {inherit}") # Convert old artifacts specifications for task, content in self._config.items(): for artifact_name in content["artifacts"].scalars: path = content["artifacts"][artifact_name] del content["artifacts"][artifact_name] content["artifacts"][artifact_name] = { "path": path, "check": True, "callable": False, } def __str__(self): return os.pathsep.join(self._config_files)
[docs] def to_json_entry(self): return self._config_files
[docs] @classmethod def from_config_files(cls, host, *config_files): taskmanager = cls(host) for config_file in config_files: taskmanager.load(config_file) return taskmanager
@property def config(self): """The tasks configuration as loaded from the :file:`tasks.cfg` (:class:`~configobj.ConfigObj`)""" return self._config @property def host(self): """The associated :class:`~woom.hosts.Host` instance""" return self._host
[docs] @functools.lru_cache def get_task(self, name): """Get a :class:`Task` instance Parameters ---------- name: str Known task name Return ------ Task """ if name not in self._config: raise TaskError(f"Invalid task name: {name}") # Create instance return Task(self._config[name], self.host)
[docs] class Task:
[docs] def __init__(self, taskconfig, host): """Initialize a task Parameters ---------- taskconfig : configobj.Section Task configuration section host : Host Host instance """ self._config = taskconfig self._host = host self._context = None self.logger = logging.getLogger(__name__)
@property def config(self): """The task configuration as loaded from the :file:`tasks.cfg` (:class:`~configobj.ConfigObj`)""" return self._config @property def host(self): """The current :class:`~woom.hosts.Host` instance (:class:`~woom.hosts.Host`)""" return self._host @property def name(self): """The task name (:class:`str`)""" return self.config.name def __str__(self): return self.name @property def is_blocking(self): """It is blocking?""" return self.config["submit"]["blocking"] @property def is_skipped(self): """Is this task statically skipped (set via ``skip = True`` in :file:`tasks.cfg`)?""" return self.config.get("skip", False)
[docs] def set_context(self, context): """Set the context to be used for jinja rendering Parameters ---------- context: woom.context.Context """ if context is not None: context["task"] = self self._context = context
@property def context(self): """Context to be used for rendering""" if self._context is None: raise TaskError("The context must be set before using it") self._context["task"] = self return self._context @context.setter def context(self, context): self.set_context(context) @context.deleter def context(self): self._context = None
[docs] def has_context(self): """Is the :attr:`context` already set for this task""" return self._context is not None
@functools.cached_property def env(self): """Instance of :class:`woom.env.EnvConfig` specific to this task (:class:`~woom.env.EnvConfig`)""" # Get env from name, possibly empty env = self.host.get_env(self.config["content"]["env"]).copy() # Add woom variables env.vars_set.update(WOOM_TASK_NAME=self.name) if self.run_dir: env.vars_set.update(WOOM_RUN_DIR=self.run_dir) return env @property def artifacts(self): """The artifacts specifications as a dict (:class:`dict`)""" return self.get_artifacts()
[docs] def get_artifacts(self): """Get the artifacts raw paths as a dict It generates the list of paths of all artifacts. Parameters ---------- context: dict, woom.context.Context A dict that contain the context used to render the job script. It is required when the artifact paths must be generated by a function. Returns ------- dict Keys are artifacts names and values are lists of raw artifact paths """ artifacts = {} for name, specs in self.config["artifacts"].items(): artifacts[name] = self.get_artifact_path(name) return artifacts
[docs] @functools.lru_cache def get_artifact_path(self, name): """Get the path of a single artifact from its name Parameters ---------- name: str Artifact name Returns ------- str or list(str) A single pr a lists of raw artifact paths """ specs = self.config["artifacts"][name].dict() # if not isinstance(specs["path"], list): # specs["paths"] = [specs["paths"]] if specs["callable"]: func_name = specs["path"] if isinstance(func_name, list): func_name = func_name[0] if func_name not in ARTIFACTS_GENERATORS: raise TaskError(f"Artifact generator function not found: {func_name}") kwargs = dict(self.context) kwargs.update(specs["kwargs"]) path = ARTIFACTS_GENERATORS[func_name](**kwargs) # if isinstance(paths, str): # paths = [paths] else: path = specs["path"] return path
@functools.cached_property def run_dir(self): """Run directory""" run_dir = self.config["content"]["run_dir"] if run_dir is None: return "" if run_dir == "current": run_dir = os.getcwd() return run_dir.strip() # return self.get_run_dir() @property def commandline(self): """Commandline as an bash lines""" return self.config["content"]["commandline"]
[docs] def render_artifacts(self): """Check that artifact paths are absolute and render them as dict Parameters ---------- context: dict, woom.context.Context Parameters used for substitution Return ------ dict Keys are artifacts names and values are lists of rendered artifact paths """ if not self.artifacts: return {} artifacts = {} for name, path in self.get_artifacts().items(): single = isinstance(path, str) artifacts[name] = [] paths = [path] if single else path for path_ in paths: rendered = wrender.render(path_.strip(), self.context) if not os.path.isabs(path_): run_dir = wrender.render(self.run_dir, self.context) if run_dir: rendered = os.path.join(run_dir, rendered) else: raise TaskError( f"Rendered artifact '{name}' of task '{self.name}' is not absolute " "and task run_dir is not defined. Please fix it!" ) if single: artifacts[name] = rendered else: artifacts[name].append(rendered) return artifacts
[docs] def render_content(self): """Render the task content with jinja Rendering uses parameters and the template defined in the configuration, which defaults to :ref:`job.sh<templates.job.sh>`. Return ------ str """ # context = context.copy() # context["context"] = context template = wrender.JINJA_ENV.get_template(self.config["content"]["template"]) return wrender.render(template, self.context)
[docs] def fill_templates(self, dry=False): """Fill static user template files""" for name in self.config["fill"].sections: config = self.config["fill"][name] # Render paths template_file = wrender.render(config["template"], self.context) destination = wrender.render(config["destination"], self.context) # Fill template self.logger.debug(f"Fill template '{name}': {template_file}{destination}") template = wrender.JINJA_ENV.get_template(template_file) content = wrender.render(template, self.context) # Write destination destination = wutil.check_dir(destination, dry=dry, logger=self.logger) if not dry: with open(destination, "w") as f: f.write(content) self.logger.info(f"Filled ttemplate '{name}': {template_file}{destination}")
[docs] def export_scheduler_options(self): """Export a dict of scheduler options Returns ------- dict """ if not self.host.config["scheduler"]: return {} opts = wconf.strip_out_sections(self.config["submit"]).dict() # Render Jinja2 templates in submit options (e.g. nnodes = {{ params.nnodes }}) if self._context is not None: opts = {k: wrender.render(v, self.context) if isinstance(v, str) else v for k, v in opts.items()} if self.config["submit"]["queue"]: opts["queue"] = self.host.config["queues"][self.config["submit"]["queue"]] return opts
[docs] def export(self): """Export the task specification with rendering .. warning:: The :attr:`context` must be set with :meth:`set_context` """ return { "script_content": self.render_content(), "scheduler_options": self.export_scheduler_options(), "artifacts": self.render_artifacts(), }