Source code for woom.context

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Context for job script generation
"""

import json
import os
from collections import UserDict

from . import conf as wconf
from . import iters as witers
from . import util as wutil


[docs] class Context(UserDict): """Dict-like context that is used by jinja to fill templates Its content is accessible with keys like:: context = Context(workflow, task_name, cycle, member) print(context["task"].name) See :ref:`inputs_context` for a list of available keys. .. note:: Some of the items are available as attributes, like :attr:`workflow`. """
[docs] def __init__(self, workflow, task_name=None, cycle=None, member=None, extra_params=None): """Initialize rendering context Parameters ---------- workflow : Workflow Workflow instance task_name : str, optional Task name cycle : Cycle, str, optional Current cycle member : Member, optional Ensemble member extra_params : dict, optional Extra parameters for rendering """ initialdata = { "workflow": workflow, "host": workflow.host, "taskmanager": workflow.taskmanager, "jobmanager": workflow.jobmanager, "logger": workflow.logger, "task_tree": workflow.task_tree, "config": workflow.config, "cycles": workflow.cycles, "nmembers": workflow.nmembers, "members": workflow.members, "paths": workflow.paths, "app_path": workflow.get_app_path(), "env_vars": workflow.config["env_vars"].dict(), "os": os, } super().__init__(initialdata) # Config subsections params = workflow.config["params"].dict() del params["hosts"], params["tasks"] for sec in "app", "cycles": for key, val in workflow.config[sec].items(): self[f"{sec}_{key}"] = val self["app_path"] = workflow.get_app_path() self["app"] = workflow.config["app"].dict() self["app"]["path"] = workflow.get_app_path() # Host params self.update(workflow.host.get_params()) if workflow.host.name in workflow.config["params"]["hosts"]: host_params = wconf.strip_out_sections( workflow.config["params"]["hosts"][workflow.host.name] ).dict() self.update(host_params) # Workflow directories self.update(workflow_dir=workflow.workflow_dir, log_dir=os.path.join(workflow.workflow_dir, "log")) # Current cycle self["cycle"] = cycle if isinstance(cycle, witers.Cycle): self.update(cycle.get_params()) if isinstance(cycle.prev, witers.Cycle): self.update(cycle.prev.get_params(suffix="prev")) if isinstance(cycle.next, witers.Cycle): self.update(cycle.next.get_params(suffix="next")) # Current member self["member"] = member if member: self.update(member.params) # Current task self["task_name"] = task_name if task_name is None: self["task"] = None else: self["task"] = task = workflow.get_task(task_name) self.update( task_path=workflow.get_task_path(task_name, cycle, member), task_name=task_name, ) # Task specific params if task_name in workflow.config["params"]["tasks"]: task_params = workflow.config["params"]["tasks"][task_name].dict() params.update(task_params) # if workflow.host.name in workflow.config["params"]["tasks"][task_name]: # params.update( # wconf.strip_out_sections( # workflow.config["params"]["tasks"][task_name][workflow.host.name].dict() # ) # ) # Paths task_submission_dir = workflow.get_task_submission_dir(task_name, cycle, member) self.update( task_run_dir=task.run_dir, task_submission_dir=task_submission_dir, task_script_path=os.path.join(task_submission_dir, "job.sh"), ) for key in "run_dir", "submission_dir", "script_path": self[key] = self["task_" + key] # backward compat self["run_dir"] = self["task_run_dir"] task.env.prepend_paths(**workflow.paths) # Environment self["task_env"] = self["env"] = task.env # with backward compat # Json file self["task_context_json"] = self["context_json"] = os.path.join( task_submission_dir, "context.json" ) # with backward compat # Extra params if extra_params: params.update(extra_params) # Store params and set env vars self["params"] = params # Convert self.data (the underlying dict) to env vars, not self (which would cause recursion) env_vars = wutil.dict_to_env_vars( self.data, exclude=["context", "env_vars", "os", "logger", "config"] ) self["context"] = self self["env_vars"] = env_vars if self.task: self.task.env.vars_set.update(self["env_vars"])
def __repr__(self): return ( "<Context(<Worflow>, task_name={self['task_name']}," " cycle={self['cycle']}, member={self['member']})>" )
[docs] def copy(self): return Context(self.workflow)
@property def workflow(self): """The current :class:`~woom.workflow.Workflow` instance""" return self["workflow"] @property def config(self): """The workflow configuration""" return self["config"] @property def params(self): """A :class:`dict` of user parameters as declared in the 'params' section of the workflow configuration""" return self["params"] @property def env_vars(self): """A :class:`dict` of environment variables as declared in the workflow configuration""" return self["env_vars"] @property def task(self): """The current :class:`~woom.tasks.Tasks` instance or `None`""" return self.get("task") @property def cycle(self): """The current :class:`~woom.iters.Cycle` instance or `None`""" return self.get("cycle") @property def member(self): """The current :class:`~woom.iters.Member` instance or `None`""" return self.get("member") def __enter__(self): self._old_workflow_context = self.workflow._context self._old_task_context = self.task._context self.workflow.context = self self.task.context = self return self def __exit__(self, exc_type, exc_value, traceback): if hasattr(self, '_old_workflow_context'): self.workflow._context = self._old_workflow_context if hasattr(self.workflow, 'context'): delattr(self.workflow, 'context') if hasattr(self, '_old_task_context'): self.task._context = self._old_task_context if self.task.has_context(): delattr(self.task, 'context')
[docs] def to_json(self): """Export context to json""" if "context_json" in self: content = self.copy() if "context" in content: del content["context"] with open(self["context_json"], "w") as f: json.dump(content, f, indent=4, cls=wutil.WoomJSONEncoder)