Source code for woom.cli

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Commandline interface
"""

import argparse

# from pathlib import Path
import logging
import os

# from . import job as wjob
from . import conf as wconf
from . import ext as wext
from . import hosts as whosts
from . import log as wlog
from . import tasks as wtasks
from . import util as wutil
from . import workflow as wworkflow

# %% Main


[docs] def get_parser(): parser = argparse.ArgumentParser( description="woom interface", formatter_class=argparse.ArgumentDefaultsHelpFormatter, ) parser.add_argument("--app-name", help="application name") parser.add_argument("--app-conf", help="application configuration") parser.add_argument("--app-exp", help="application experiment") parser.add_argument("--workflow-cfg", default="workflow.cfg", help="workflow configuration file") parser.add_argument( "--workflow-ini", default="workflow.ini", help="user workflow configuration specifications file", ) parser.add_argument("--tasks-cfg", default="tasks.cfg", help="tasks configuration file") parser.add_argument("--hosts-cfg", help="hosts configuration file", default="hosts.cfg") parser.add_argument("--host", help="target host as described in the hosts configuration file") parser.add_argument("--begin-date", help="begin date", type=wconf.is_datetime) parser.add_argument("--end-date", help="end date", type=wconf.is_datetime) parser.add_argument("--freq", help="interval between cycles") parser.add_argument("--ncycle", help="number of cycles", type=int) subparsers = parser.add_subparsers(help="sub-command help") add_parser_show(subparsers) add_parser_run(subparsers) add_parser_kill(subparsers) add_parser_clean(subparsers) return parser
[docs] def main(): # Get the parser parser = get_parser() # Parse args args = parser.parse_args() # Call subparser function if hasattr(args, "func"): args.func(parser, args) elif hasattr(args, "subcommands"): parser.exit(0, "please use one of the subcommands: " f"{args.subcommands}\n") else: parser.print_usage()
[docs] def get_workflow_cfg(parser, args): """Workflow dir from workflow config file""" workflow_cfg = os.path.abspath(args.workflow_cfg) if not os.path.exists(workflow_cfg): parser.error(f"Workflow configuration file not found: {args.workflow_cfg}") return workflow_cfg
[docs] def setup_logger(workflow_dir, args): log_file = wutil.check_dir(os.path.join(workflow_dir, "log", "woom.log"), logger=False) wlog.main_setup_logging(args, to_file=log_file) return logging.getLogger(__name__)
[docs] def setup_workflow(parser, args): # Workflow file workflow_cfg = get_workflow_cfg(parser, args) # Get logger logger = setup_logger(os.path.dirname(workflow_cfg), args) # Setup the workflow logger.debug("Run the workflow") try: workflow = get_workflow(workflow_cfg, logger, parser, args) except Exception as e: logger.exception(f"Workflow setup failed: {e.args[0]}") return None, None else: logger.info("Successfully setup the workflow!") return workflow, logger
[docs] def get_workflow(workflow_cfg, logger, parser, args): # , clean): # # Workflow dir workflow_dir = os.path.dirname(workflow_cfg) # Load extensions logger.debug("Loading extensions") exts = wext.load_extensions(workflow_dir) if exts: logger.info("Loaded extensions: " + ", ".join(exts)) else: logger.info("No extension to load") # Load workflow config workflow_cfgspecs = [wworkflow.CFGSPECS_FILE] if os.path.exists(args.workflow_ini): logger.info( f"Using user specific file for workflow configuration specifications: {args.workflow_ini}" ) workflow_cfgspecs.append(args.workflow_ini) logger.debug(f"Load workflow config: {workflow_cfg}") workflow_config = wconf.load_cfg(workflow_cfg, workflow_cfgspecs, list_values=True) logger.info("Loaded workflow config") # App wconf.merge_args_with_config(workflow_config, args, ["name", "conf", "exp"], prefix="app_") app_name = workflow_config["app"]["name"] app_conf = workflow_config["app"]["conf"] app_exp = workflow_config["app"]["exp"] if app_name: logger.info(f"App name: {app_name}") if app_conf: logger.info(f"App conf: {app_conf}") if app_exp: logger.info(f"App exp: {app_exp}") # app = dict(app_name=app_name, app_conf=app_conf, app_exp=app_exp) # Cycles wconf.merge_args_with_config( workflow_config["cycles"], args, ["begin_date", "end_date", "freq", "ncycle"], ) # Get host logger.debug("Initialize the host manager") hostmanager = whosts.HostManager() logger.info("Initialized the host manager") if args.hosts_cfg: logger.debug("Load hosts config file: " + args.hosts_cfg) hostmanager.load_config(args.hosts_cfg) logger.info("Loaded hosts config file: " + args.hosts_cfg) if args.host: logger.debug("Get host instance: " + args.host) host = hostmanager.get_host(args.host) logger.info("Got host instance: " + args.host) else: logger.debug("Infer host") host = hostmanager.infer_host() logger.info("Infered host: " + host.name) # Init task manager logger.debug("Initialize the task manager") taskmanager = wtasks.TaskManager(host) logger.info("Initialized the task manager") logger.debug("Load the task config file: " + args.tasks_cfg) taskmanager.load_config(args.tasks_cfg) logger.info("Loaded the task config file: " + args.tasks_cfg) # Init workflow logger.debug("Initialize the workflow") workflow = wworkflow.Workflow(workflow_config, taskmanager) logger.info("Initialized the workflow") return workflow
# %% Show
[docs] def add_parser_show(subparsers): # Setup argument parser parser_show = subparsers.add_parser( "show", help="show info about the workflow", formatter_class=argparse.ArgumentDefaultsHelpFormatter, ) subparsers_show = parser_show.add_subparsers(help="sub-command help") add_parser_show_overview(subparsers_show) add_parser_show_status(subparsers_show) add_parser_show_run_dirs(subparsers_show) add_parser_show_artifacts(subparsers_show) return parser_show
[docs] def add_parser_show_overview(subparsers): # Setup argument parser parser_show_overview = subparsers.add_parser( "overview", help="show main info like the task tree and cycles", formatter_class=argparse.ArgumentDefaultsHelpFormatter, ) wlog.add_logging_parser_arguments(parser_show_overview, default_level="warning") parser_show_overview.set_defaults(func=main_show_overview) return parser_show_overview
[docs] def main_show_overview(parser, args): # Setup the workflow workflow, logger = setup_workflow(parser, args) if not workflow: return # Show the status try: workflow.show_overview() except Exception: logger.exception("Failed to display the overview")
[docs] def add_parser_show_status(subparsers): # Setup argument parser parser_show_status = subparsers.add_parser( "status", help="get the status of all jobs", formatter_class=argparse.ArgumentDefaultsHelpFormatter, ) parser_show_status.add_argument("-r", "--running", help="show only running jobs", action="store_true") parser_show_status.add_argument( "--tablefmt", help="table format (see the tabulate package)", default="rounded_outline" ) parser_show_status.add_argument("--no-color", help="don't colorize the status", action="store_true") wlog.add_logging_parser_arguments(parser_show_status, default_level="warning") parser_show_status.set_defaults(func=main_show_status) return parser_show_status
[docs] def main_show_status(parser, args): # Setup the workflow workflow, logger = setup_workflow(parser, args) if not workflow: return # Show the status try: workflow.show_status(tablefmt=args.tablefmt, running=args.running, colorize=not args.no_color) except Exception: logger.exception("Failed querying the status")
[docs] def add_parser_show_run_dirs(subparsers): # Setup argument parser parser_show_run_dirs = subparsers.add_parser( "run_dirs", help="show the run directory of all worklow tasks", formatter_class=argparse.ArgumentDefaultsHelpFormatter, ) parser_show_run_dirs.add_argument( "--tablefmt", help="table format (see the tabulate package)", default="rounded_outline" ) wlog.add_logging_parser_arguments(parser_show_run_dirs, default_level="warning") parser_show_run_dirs.set_defaults(func=main_show_run_dirs) return parser_show_run_dirs
[docs] def main_show_run_dirs(parser, args): # Setup the workflow workflow, logger = setup_workflow(parser, args) if not workflow: return # Show try: workflow.show_run_dirs(tablefmt=args.tablefmt) except Exception: logger.exception("Failed showing the run directories")
[docs] def add_parser_show_artifacts(subparsers): # Setup argument parser parser_show_artifacts = subparsers.add_parser( "artifacts", help="show the run directory of all worklow tasks", formatter_class=argparse.ArgumentDefaultsHelpFormatter, ) parser_show_artifacts.add_argument( "--tablefmt", help="table format (see the tabulate package)", default="rounded_outline" ) wlog.add_logging_parser_arguments(parser_show_artifacts, default_level="warning") parser_show_artifacts.set_defaults(func=main_show_artifacts) return parser_show_artifacts
[docs] def main_show_artifacts(parser, args): # Setup the workflow workflow, logger = setup_workflow(parser, args) if not workflow: return # Show try: workflow.show_artifacts(tablefmt=args.tablefmt) except Exception: logger.exception("Failed showing the run directories")
# %% Run
[docs] def add_parser_run(subparsers): # Setup argument parser parser_run = subparsers.add_parser( "run", help="run a workflow", formatter_class=argparse.ArgumentDefaultsHelpFormatter, ) parser_run.add_argument( "--dry-run", "--test", help="run in fake mode for testing purpose", action="store_true", ) parser_run.add_argument( "--update", help="do not run if it has already been run", action="store_true", ) wlog.add_logging_parser_arguments(parser_run) parser_run.set_defaults(func=main_run) return parser_run
[docs] def main_run(parser, args): # Setup the workflow workflow, logger = setup_workflow(parser, args) if not workflow: return # Run the workflow logger.debug("Run the workflow") try: workflow.run(dry=args.dry_run, update=args.update) except Exception as e: logger.exception(f"Workflow failed: {e.args[0]}") else: logger.info("Successfully ran the workflow!")
# %% Kill
[docs] def add_parser_kill(subparsers): # Setup argument parser parser_kill = subparsers.add_parser( "kill", help="kill one or all workflow jobs", formatter_class=argparse.ArgumentDefaultsHelpFormatter, ) parser_kill.add_argument("jobid", help="job id", nargs="*") parser_kill.add_argument("--task", help="kill this task only", default=None) parser_kill.add_argument("--cycle", help="kill this cycle only", default=None) parser_kill.add_argument( "--dry-run", help="run in fake mode for testing purpose", action="store_true", ) wlog.add_logging_parser_arguments(parser_kill, default_level="warning") parser_kill.set_defaults(func=main_kill) return parser_kill
[docs] def main_kill(parser, args): # Setup the workflow workflow, logger = setup_workflow(parser, args) if not workflow: return # Kill try: workflow.kill(jobid=args.jobid, task_name=args.task, cycle=args.cycle) except Exception: logger.exception("Failed to kill jobs")
# %% Clean
[docs] def add_parser_clean(subparsers): # Setup argument parser parser_clean = subparsers.add_parser( "clean", help="remove temporary files", formatter_class=argparse.ArgumentDefaultsHelpFormatter, ) parser_clean.add_argument("extra_file", help="extra file or directory to remove", nargs="*") parser_clean.add_argument( "--without-submission-dirs", help="do not remove submission directories", action="store_true", ) parser_clean.add_argument("--with-run-dirs", help="remove run directories", action="store_true") parser_clean.add_argument("--with-log-files", help="remove log files", action="store_true") parser_clean.add_argument("--with-artifacts", help="remove artifacts", action="store_true") parser_clean.add_argument( "--dry-run", "--test", help="run in fake mode for testing purpose", action="store_true", ) wlog.add_logging_parser_arguments(parser_clean, default_level="info") parser_clean.set_defaults(func=main_clean) return parser_clean
[docs] def main_clean(parser, args): # Setup the workflow workflow, logger = setup_workflow(parser, args) if not workflow: return # Kill running jobs if not args.dry_run: try: workflow.kill() except Exception: logger.exception("Failed to kill all jobs") # Show the status try: workflow.clean( submission_dirs=not args.without_submission_dirs, run_dirs=args.with_run_dirs, log_files=args.with_log_files, artifacts=args.with_artifacts, dry=args.dry_run, ) except Exception: logger.exception("Failed to clean workflow")