Source code for woom.cli

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Commandline interface
"""

import argparse
import logging
import os

# from . import job as wjob
from . import conf as wconf
from . import ext as wext
from . import hosts as whosts
from . import log as wlog
from . import render as wrender
from . import tasks as wtasks
from . import util as wutil
from . import workflow as wworkflow

# %% Main


[docs] def get_parser(): parser = argparse.ArgumentParser( description="woom interface", formatter_class=argparse.ArgumentDefaultsHelpFormatter, ) parser.add_argument("--app-name", help="application name") parser.add_argument("--app-conf", help="application configuration") parser.add_argument("--app-exp", help="application experiment") parser.add_argument( "--workflow-cfg", default="workflow.cfg", help="workflow configuration file", ) parser.add_argument( "--workflow-ini", default="workflow.ini", help="user workflow configuration specifications file", ) parser.add_argument("--tasks-cfg", default="tasks.cfg", help="tasks configuration file") parser.add_argument("--hosts-cfg", help="hosts configuration file", default="hosts.cfg") parser.add_argument( "--host", help="target host as described in the hosts configuration file", ) parser.add_argument("--begin-date", help="begin date", type=wconf.is_datetime) parser.add_argument("--end-date", help="end date", type=wconf.is_datetime) parser.add_argument("--freq", help="interval between cycles") parser.add_argument("--ncycle", help="number of cycles", type=int) subparsers = parser.add_subparsers(help="sub-command help") add_parser_show(subparsers) add_parser_run(subparsers) add_parser_kill(subparsers) add_parser_clean(subparsers) add_parser_fill(subparsers) add_parser_monitor(subparsers) return parser
[docs] def main(): # Get the parser parser = get_parser() # Parse args args = parser.parse_args() # Call subparser function if hasattr(args, "func"): args.func(parser, args) elif hasattr(args, "subcommands"): parser.exit(0, f"please use one of the subcommands: {args.subcommands}\n") else: parser.print_usage()
[docs] def get_workflow_cfg(parser, args): """Workflow dir from workflow config file""" workflow_cfg = os.path.abspath(args.workflow_cfg) if not os.path.exists(workflow_cfg): parser.error(f"Workflow configuration file not found: {args.workflow_cfg}") return workflow_cfg
[docs] def setup_logger(workflow_dir, args): log_file = wutil.check_dir(os.path.join(workflow_dir, "log", "woom.log"), logger=False) wlog.main_setup_logging(args, to_file=log_file) return logging.getLogger(__name__)
[docs] def setup_workflow(parser, args): # Workflow file workflow_cfg = get_workflow_cfg(parser, args) # Get logger logger = setup_logger(os.path.dirname(workflow_cfg), args) # Setup the workflow logger.debug("Run the workflow") try: workflow = get_workflow(workflow_cfg, logger, parser, args) except Exception as e: logger.exception(f"Workflow setup failed: {e.args[0]}") return None, None else: logger.info("Successfully setup the workflow!") return workflow, logger
[docs] def get_workflow(workflow_cfg, logger, parser, args): # , clean): # # Workflow dir workflow_dir = os.path.dirname(workflow_cfg) # Load extensions logger.debug("Loading extensions") exts = wext.load_extensions(workflow_dir) if exts: logger.info("Loaded extensions: " + ", ".join(exts)) else: logger.info("No extension to load") # Load workflow config workflow_cfgspecs = [wworkflow.CFGSPECS_FILE] if os.path.exists(args.workflow_ini): logger.info( f"Using user specific file for workflow configuration specifications: {args.workflow_ini}" ) workflow_cfgspecs.append(args.workflow_ini) logger.debug(f"Load workflow config: {workflow_cfg}") workflow_config = wconf.load_cfg(workflow_cfg, workflow_cfgspecs, list_values=True) logger.info("Loaded workflow config") # App wconf.merge_args_with_config(workflow_config, args, ["name", "conf", "exp"], prefix="app_") # if workflow_config["app"]["name"] is None: # workflow_config["app"]["name"] = os.path.basename(workflow_dir) # logger.debug("Inferred app name from workflow dir: " + workflow_config["app"]["name"]) app_name = workflow_config["app"]["name"] app_conf = workflow_config["app"]["conf"] app_exp = workflow_config["app"]["exp"] if app_name: logger.info(f"App name: {app_name}") if app_conf: logger.info(f"App conf: {app_conf}") if app_exp: logger.info(f"App exp: {app_exp}") # Cycles wconf.merge_args_with_config( workflow_config["cycles"], args, ["begin_date", "end_date", "freq", "ncycle"], ) # Get host logger.debug("Initialize the host manager") hostmanager = whosts.HostManager() logger.info("Initialized the host manager") if args.hosts_cfg: logger.debug("Load hosts config file: " + args.hosts_cfg) hostmanager.load_config(args.hosts_cfg) logger.info("Loaded hosts config file: " + args.hosts_cfg) if args.host: logger.debug("Get host instance: " + args.host) host = hostmanager.get_host(args.host) logger.info("Got host instance: " + args.host) else: logger.debug("Infer host") host = hostmanager.infer_host() logger.info("Infered host: " + host.name) # Init task manager logger.debug("Initialize the task manager") taskmanager = wtasks.TaskManager(host) logger.info("Initialized the task manager") logger.debug("Load the task config file: " + args.tasks_cfg) taskmanager.load_config(args.tasks_cfg) logger.info("Loaded the task config file: " + args.tasks_cfg) # Init workflow logger.debug("Initialize the workflow") workflow = wworkflow.Workflow(workflow_config, taskmanager) logger.info("Initialized the workflow") return workflow
# %% Show
[docs] def add_parser_show(subparsers): # Setup argument parser parser_show = subparsers.add_parser( "show", help="show info about the workflow", formatter_class=argparse.ArgumentDefaultsHelpFormatter, ) subparsers_show = parser_show.add_subparsers(help="sub-command help") add_parser_show_overview(subparsers_show) add_parser_show_status(subparsers_show) add_parser_show_submission_dirs(subparsers_show) add_parser_show_run_dirs(subparsers_show) add_parser_show_artifacts(subparsers_show) return parser_show
[docs] def add_parser_show_overview(subparsers): # Setup argument parser parser_show_overview = subparsers.add_parser( "overview", help="show main info like the task tree and cycles", formatter_class=argparse.ArgumentDefaultsHelpFormatter, ) wlog.add_logging_parser_arguments(parser_show_overview, default_level="warning") parser_show_overview.set_defaults(func=main_show_overview) return parser_show_overview
[docs] def main_show_overview(parser, args): # Setup the workflow workflow, logger = setup_workflow(parser, args) if not workflow: return # Show the status try: workflow.show_overview() except Exception: logger.exception("Failed to display the overview") return 1 return 0
[docs] def add_parser_show_status(subparsers): # Setup argument parser parser_show_status = subparsers.add_parser( "status", help="get the status of all jobs", formatter_class=argparse.ArgumentDefaultsHelpFormatter, ) parser_show_status.add_argument("-r", "--running", help="show only running jobs", action="store_true") parser_show_status.add_argument( "--tablefmt", help="table format (see the tabulate package)", default="rounded_outline", ) parser_show_status.add_argument("--no-color", help="don't colorize the status", action="store_true") wlog.add_logging_parser_arguments(parser_show_status, default_level="warning") parser_show_status.set_defaults(func=main_show_status) return parser_show_status
[docs] def main_show_status(parser, args): # Setup the workflow workflow, logger = setup_workflow(parser, args) if not workflow: return 0 # Show the status try: workflow.show_status( tablefmt=args.tablefmt, running=args.running, colorize=not args.no_color, ) except Exception: logger.exception("Failed querying the status") return 1 return 0
[docs] def add_parser_show_submission_dirs(subparsers): # Setup argument parser parser_show_submission_dirs = subparsers.add_parser( "submission_dirs", help="show the submission directory of all worklow tasks", formatter_class=argparse.ArgumentDefaultsHelpFormatter, ) parser_show_submission_dirs.add_argument( "--tablefmt", help="table format (see the tabulate package)", default="rounded_outline", ) wlog.add_logging_parser_arguments(parser_show_submission_dirs, default_level="warning") parser_show_submission_dirs.set_defaults(func=main_show_submission_dirs) return parser_show_submission_dirs
[docs] def main_show_submission_dirs(parser, args): # Setup the workflow workflow, logger = setup_workflow(parser, args) if not workflow: return 0 # Show try: workflow.show_submission_dirs(tablefmt=args.tablefmt) except Exception: logger.exception("Failed showing the submission directories") return 1 return 1
[docs] def add_parser_show_run_dirs(subparsers): # Setup argument parser parser_show_run_dirs = subparsers.add_parser( "run_dirs", help="show the run directory of all worklow tasks", formatter_class=argparse.ArgumentDefaultsHelpFormatter, ) parser_show_run_dirs.add_argument( "--tablefmt", help="table format (see the tabulate package)", default="rounded_outline", ) wlog.add_logging_parser_arguments(parser_show_run_dirs, default_level="warning") parser_show_run_dirs.set_defaults(func=main_show_run_dirs) return parser_show_run_dirs
[docs] def main_show_run_dirs(parser, args): # Setup the workflow workflow, logger = setup_workflow(parser, args) if not workflow: return 0 # Show try: workflow.show_run_dirs(tablefmt=args.tablefmt) except Exception: logger.exception("Failed showing the run directories") return 1 return 1
[docs] def add_parser_show_artifacts(subparsers): # Setup argument parser parser_show_artifacts = subparsers.add_parser( "artifacts", help="show the artifacts of all worklow tasks", formatter_class=argparse.ArgumentDefaultsHelpFormatter, ) parser_show_artifacts.add_argument( "--tablefmt", help="table format (see the tabulate package)", default="rounded_outline", ) wlog.add_logging_parser_arguments(parser_show_artifacts, default_level="warning") parser_show_artifacts.set_defaults(func=main_show_artifacts) return parser_show_artifacts
[docs] def main_show_artifacts(parser, args): # Setup the workflow workflow, logger = setup_workflow(parser, args) if not workflow: return 0 # Show try: workflow.show_artifacts(tablefmt=args.tablefmt) except Exception: logger.exception("Failed showing the run directories") return 1 return 0
# %% Run
[docs] def add_parser_run(subparsers): # Setup argument parser parser_run = subparsers.add_parser( "run", help="run a workflow", formatter_class=argparse.ArgumentDefaultsHelpFormatter, ) parser_run.add_argument( "--dry-run", "--test", help="run in fake mode for testing purpose", action="store_true", ) parser_run.add_argument( "--force", "--update", help="do not run if it has already been run", action="store_true", ) parser_run.add_argument( "--skip", help=( "task names to skip: not submitted but kept in the task treeso their artifacts remain accessible" ), nargs="+", metavar="TASK", default=[], ) wlog.add_logging_parser_arguments(parser_run) parser_run.set_defaults(func=main_run) return parser_run
[docs] def main_run(parser, args): # Setup the workflow workflow, logger = setup_workflow(parser, args) if not workflow: return 0 # Run the workflow logger.debug("Run the workflow") try: workflow.run(dry=args.dry_run, force=args.force, skip=args.skip) except Exception as e: logger.exception(f"Workflow failed: {e.args[0]}") return 1 else: logger.info("Successfully ran the workflow!") return 0
# %% Kill
[docs] def add_parser_kill(subparsers): # Setup argument parser parser_kill = subparsers.add_parser( "kill", help="kill one or all workflow jobs", formatter_class=argparse.ArgumentDefaultsHelpFormatter, ) parser_kill.add_argument("jobid", help="job id", nargs="*") parser_kill.add_argument("--task", help="kill this task only", default=None) parser_kill.add_argument("--cycle", help="kill this cycle only", default=None) parser_kill.add_argument( "--dry-run", help="run in fake mode for testing purpose", action="store_true", ) wlog.add_logging_parser_arguments(parser_kill, default_level="warning") parser_kill.set_defaults(func=main_kill) return parser_kill
[docs] def main_kill(parser, args): # Setup the workflow workflow, logger = setup_workflow(parser, args) if not workflow: return 0 # Kill try: workflow.kill(jobid=args.jobid, task_name=args.task, cycle=args.cycle) except Exception: logger.exception("Failed to kill jobs") return 1 return 0
# %% Clean
[docs] def add_parser_clean(subparsers): # Setup argument parser parser_clean = subparsers.add_parser( "clean", help="remove temporary files", formatter_class=argparse.ArgumentDefaultsHelpFormatter, ) parser_clean.add_argument("extra_file", help="extra file or directory to remove", nargs="*") parser_clean.add_argument( "--without-submission-dirs", help="do not remove submission directories", action="store_true", ) parser_clean.add_argument("--with-run-dirs", help="remove run directories", action="store_true") parser_clean.add_argument("--with-log-files", help="remove log files", action="store_true") parser_clean.add_argument("--with-artifacts", help="remove artifacts", action="store_true") parser_clean.add_argument( "--dry-run", "--test", help="run in fake mode for testing purpose", action="store_true", ) wlog.add_logging_parser_arguments(parser_clean, default_level="info") parser_clean.set_defaults(func=main_clean) return parser_clean
[docs] def main_clean(parser, args): # Setup the workflow workflow, logger = setup_workflow(parser, args) if not workflow: return 0 # Kill running jobs if not args.dry_run: try: workflow.kill() except Exception: logger.exception("Failed to kill all jobs") return 1 # Show the status try: workflow.clean( submission_dirs=not args.without_submission_dirs, run_dirs=args.with_run_dirs, log_files=args.with_log_files, artifacts=args.with_artifacts, dry=args.dry_run, ) except Exception: logger.exception("Failed to clean workflow") return 1 return 0
# %% Fill
[docs] def add_parser_fill(subparsers): # Setup argument parser parser_fill = subparsers.add_parser( "fill", help="remove temporary files", formatter_class=argparse.ArgumentDefaultsHelpFormatter, ) parser_fill.add_argument("template", help="template file") parser_fill.add_argument("destination", help="output file") parser_fill.add_argument( "--task-name", help="target task name", default=os.environ.get("WOOM_TASK_NAME"), ) parser_fill.add_argument("--cycle", help="target cycle", default=os.environ.get("WOOM_CYCLE")) parser_fill.add_argument("--member", help="target member", default=os.environ.get("WOOM_MEMBER")) parser_fill.add_argument( "--dry-run", "--test", help="run in fake mode for testing purpose", action="store_true", ) wlog.add_logging_parser_arguments(parser_fill, default_level="info") parser_fill.set_defaults(func=main_fill) return parser_fill
[docs] def main_fill(parser, args): # Setup the workflow workflow, logger = setup_workflow(parser, args) if not workflow: return 0 # Fill templates try: # Set the context if args.task_name: workflow.set_context(task_name=args.task_name, cycle=args.cycle, member=args.member) # Fill logger.debug(f"Fill template: {args.template}{args.destination}") template = wrender.JINJA_ENV.get_template(args.template) content = wrender.render(template, workflow.context) # Write destination destination = wutil.check_dir(args.destination, dry=args.dry_run, logger=logger) if not args.dry_run: with open(destination, "w") as f: f.write(content) logger.info(f"Filled template: {args.template}{destination}") except Exception: logger.exception("Failed to fill template") return 1 return 0
# %% Monitor
[docs] def add_parser_monitor(subparsers): # Setup argument parser parser_monitor = subparsers.add_parser( "monitor", help="launch a web monitor for the workflow", formatter_class=argparse.ArgumentDefaultsHelpFormatter, ) parser_monitor.add_argument("--port", type=int, default=5000, help="TCP port to listen on") parser_monitor.add_argument("--bind", default="127.0.0.1", help="IP address to bind to") parser_monitor.add_argument( "--no-browser", action="store_true", help="do not automatically open a browser tab", ) wlog.add_logging_parser_arguments(parser_monitor) parser_monitor.set_defaults(func=main_monitor) return parser_monitor
[docs] def main_monitor(parser, args): # Setup the workflow workflow, logger = setup_workflow(parser, args) if not workflow: return 0 # Launch the web monitor (lazy import so Flask is optional) from . import monitor as wmonitor wmonitor.run_monitor( workflow, host=args.bind, port=args.port, open_browser=not args.no_browser, ) return 0