#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Commandline interface
"""
import argparse
import logging
import os
# from . import job as wjob
from . import conf as wconf
from . import ext as wext
from . import hosts as whosts
from . import log as wlog
from . import render as wrender
from . import tasks as wtasks
from . import util as wutil
from . import workflow as wworkflow
# %% Main
[docs]
def get_parser():
parser = argparse.ArgumentParser(
description="woom interface",
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
parser.add_argument("--app-name", help="application name")
parser.add_argument("--app-conf", help="application configuration")
parser.add_argument("--app-exp", help="application experiment")
parser.add_argument(
"--workflow-cfg",
default="workflow.cfg",
help="workflow configuration file",
)
parser.add_argument(
"--workflow-ini",
default="workflow.ini",
help="user workflow configuration specifications file",
)
parser.add_argument("--tasks-cfg", default="tasks.cfg", help="tasks configuration file")
parser.add_argument("--hosts-cfg", help="hosts configuration file", default="hosts.cfg")
parser.add_argument(
"--host",
help="target host as described in the hosts configuration file",
)
parser.add_argument("--begin-date", help="begin date", type=wconf.is_datetime)
parser.add_argument("--end-date", help="end date", type=wconf.is_datetime)
parser.add_argument("--freq", help="interval between cycles")
parser.add_argument("--ncycle", help="number of cycles", type=int)
subparsers = parser.add_subparsers(help="sub-command help")
add_parser_show(subparsers)
add_parser_run(subparsers)
add_parser_kill(subparsers)
add_parser_clean(subparsers)
add_parser_fill(subparsers)
add_parser_monitor(subparsers)
return parser
[docs]
def main():
# Get the parser
parser = get_parser()
# Parse args
args = parser.parse_args()
# Call subparser function
if hasattr(args, "func"):
args.func(parser, args)
elif hasattr(args, "subcommands"):
parser.exit(0, f"please use one of the subcommands: {args.subcommands}\n")
else:
parser.print_usage()
[docs]
def get_workflow_cfg(parser, args):
"""Workflow dir from workflow config file"""
workflow_cfg = os.path.abspath(args.workflow_cfg)
if not os.path.exists(workflow_cfg):
parser.error(f"Workflow configuration file not found: {args.workflow_cfg}")
return workflow_cfg
[docs]
def setup_logger(workflow_dir, args):
log_file = wutil.check_dir(os.path.join(workflow_dir, "log", "woom.log"), logger=False)
wlog.main_setup_logging(args, to_file=log_file)
return logging.getLogger(__name__)
[docs]
def setup_workflow(parser, args):
# Workflow file
workflow_cfg = get_workflow_cfg(parser, args)
# Get logger
logger = setup_logger(os.path.dirname(workflow_cfg), args)
# Setup the workflow
logger.debug("Run the workflow")
try:
workflow = get_workflow(workflow_cfg, logger, parser, args)
except Exception as e:
logger.exception(f"Workflow setup failed: {e.args[0]}")
return None, None
else:
logger.info("Successfully setup the workflow!")
return workflow, logger
[docs]
def get_workflow(workflow_cfg, logger, parser, args): # , clean):
# # Workflow dir
workflow_dir = os.path.dirname(workflow_cfg)
# Load extensions
logger.debug("Loading extensions")
exts = wext.load_extensions(workflow_dir)
if exts:
logger.info("Loaded extensions: " + ", ".join(exts))
else:
logger.info("No extension to load")
# Load workflow config
workflow_cfgspecs = [wworkflow.CFGSPECS_FILE]
if os.path.exists(args.workflow_ini):
logger.info(
f"Using user specific file for workflow configuration specifications: {args.workflow_ini}"
)
workflow_cfgspecs.append(args.workflow_ini)
logger.debug(f"Load workflow config: {workflow_cfg}")
workflow_config = wconf.load_cfg(workflow_cfg, workflow_cfgspecs, list_values=True)
logger.info("Loaded workflow config")
# App
wconf.merge_args_with_config(workflow_config, args, ["name", "conf", "exp"], prefix="app_")
# if workflow_config["app"]["name"] is None:
# workflow_config["app"]["name"] = os.path.basename(workflow_dir)
# logger.debug("Inferred app name from workflow dir: " + workflow_config["app"]["name"])
app_name = workflow_config["app"]["name"]
app_conf = workflow_config["app"]["conf"]
app_exp = workflow_config["app"]["exp"]
if app_name:
logger.info(f"App name: {app_name}")
if app_conf:
logger.info(f"App conf: {app_conf}")
if app_exp:
logger.info(f"App exp: {app_exp}")
# Cycles
wconf.merge_args_with_config(
workflow_config["cycles"],
args,
["begin_date", "end_date", "freq", "ncycle"],
)
# Get host
logger.debug("Initialize the host manager")
hostmanager = whosts.HostManager()
logger.info("Initialized the host manager")
if args.hosts_cfg:
logger.debug("Load hosts config file: " + args.hosts_cfg)
hostmanager.load_config(args.hosts_cfg)
logger.info("Loaded hosts config file: " + args.hosts_cfg)
if args.host:
logger.debug("Get host instance: " + args.host)
host = hostmanager.get_host(args.host)
logger.info("Got host instance: " + args.host)
else:
logger.debug("Infer host")
host = hostmanager.infer_host()
logger.info("Infered host: " + host.name)
# Init task manager
logger.debug("Initialize the task manager")
taskmanager = wtasks.TaskManager(host)
logger.info("Initialized the task manager")
logger.debug("Load the task config file: " + args.tasks_cfg)
taskmanager.load_config(args.tasks_cfg)
logger.info("Loaded the task config file: " + args.tasks_cfg)
# Init workflow
logger.debug("Initialize the workflow")
workflow = wworkflow.Workflow(workflow_config, taskmanager)
logger.info("Initialized the workflow")
return workflow
# %% Show
[docs]
def add_parser_show(subparsers):
# Setup argument parser
parser_show = subparsers.add_parser(
"show",
help="show info about the workflow",
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
subparsers_show = parser_show.add_subparsers(help="sub-command help")
add_parser_show_overview(subparsers_show)
add_parser_show_status(subparsers_show)
add_parser_show_submission_dirs(subparsers_show)
add_parser_show_run_dirs(subparsers_show)
add_parser_show_artifacts(subparsers_show)
return parser_show
[docs]
def add_parser_show_overview(subparsers):
# Setup argument parser
parser_show_overview = subparsers.add_parser(
"overview",
help="show main info like the task tree and cycles",
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
wlog.add_logging_parser_arguments(parser_show_overview, default_level="warning")
parser_show_overview.set_defaults(func=main_show_overview)
return parser_show_overview
[docs]
def main_show_overview(parser, args):
# Setup the workflow
workflow, logger = setup_workflow(parser, args)
if not workflow:
return
# Show the status
try:
workflow.show_overview()
except Exception:
logger.exception("Failed to display the overview")
return 1
return 0
[docs]
def add_parser_show_status(subparsers):
# Setup argument parser
parser_show_status = subparsers.add_parser(
"status",
help="get the status of all jobs",
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
parser_show_status.add_argument("-r", "--running", help="show only running jobs", action="store_true")
parser_show_status.add_argument(
"--tablefmt",
help="table format (see the tabulate package)",
default="rounded_outline",
)
parser_show_status.add_argument("--no-color", help="don't colorize the status", action="store_true")
wlog.add_logging_parser_arguments(parser_show_status, default_level="warning")
parser_show_status.set_defaults(func=main_show_status)
return parser_show_status
[docs]
def main_show_status(parser, args):
# Setup the workflow
workflow, logger = setup_workflow(parser, args)
if not workflow:
return 0
# Show the status
try:
workflow.show_status(
tablefmt=args.tablefmt,
running=args.running,
colorize=not args.no_color,
)
except Exception:
logger.exception("Failed querying the status")
return 1
return 0
[docs]
def add_parser_show_submission_dirs(subparsers):
# Setup argument parser
parser_show_submission_dirs = subparsers.add_parser(
"submission_dirs",
help="show the submission directory of all worklow tasks",
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
parser_show_submission_dirs.add_argument(
"--tablefmt",
help="table format (see the tabulate package)",
default="rounded_outline",
)
wlog.add_logging_parser_arguments(parser_show_submission_dirs, default_level="warning")
parser_show_submission_dirs.set_defaults(func=main_show_submission_dirs)
return parser_show_submission_dirs
[docs]
def main_show_submission_dirs(parser, args):
# Setup the workflow
workflow, logger = setup_workflow(parser, args)
if not workflow:
return 0
# Show
try:
workflow.show_submission_dirs(tablefmt=args.tablefmt)
except Exception:
logger.exception("Failed showing the submission directories")
return 1
return 1
[docs]
def add_parser_show_run_dirs(subparsers):
# Setup argument parser
parser_show_run_dirs = subparsers.add_parser(
"run_dirs",
help="show the run directory of all worklow tasks",
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
parser_show_run_dirs.add_argument(
"--tablefmt",
help="table format (see the tabulate package)",
default="rounded_outline",
)
wlog.add_logging_parser_arguments(parser_show_run_dirs, default_level="warning")
parser_show_run_dirs.set_defaults(func=main_show_run_dirs)
return parser_show_run_dirs
[docs]
def main_show_run_dirs(parser, args):
# Setup the workflow
workflow, logger = setup_workflow(parser, args)
if not workflow:
return 0
# Show
try:
workflow.show_run_dirs(tablefmt=args.tablefmt)
except Exception:
logger.exception("Failed showing the run directories")
return 1
return 1
[docs]
def add_parser_show_artifacts(subparsers):
# Setup argument parser
parser_show_artifacts = subparsers.add_parser(
"artifacts",
help="show the artifacts of all worklow tasks",
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
parser_show_artifacts.add_argument(
"--tablefmt",
help="table format (see the tabulate package)",
default="rounded_outline",
)
wlog.add_logging_parser_arguments(parser_show_artifacts, default_level="warning")
parser_show_artifacts.set_defaults(func=main_show_artifacts)
return parser_show_artifacts
[docs]
def main_show_artifacts(parser, args):
# Setup the workflow
workflow, logger = setup_workflow(parser, args)
if not workflow:
return 0
# Show
try:
workflow.show_artifacts(tablefmt=args.tablefmt)
except Exception:
logger.exception("Failed showing the run directories")
return 1
return 0
# %% Run
[docs]
def add_parser_run(subparsers):
# Setup argument parser
parser_run = subparsers.add_parser(
"run",
help="run a workflow",
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
parser_run.add_argument(
"--dry-run",
"--test",
help="run in fake mode for testing purpose",
action="store_true",
)
parser_run.add_argument(
"--force",
"--update",
help="do not run if it has already been run",
action="store_true",
)
parser_run.add_argument(
"--skip",
help=(
"task names to skip: not submitted but kept in the task treeso their artifacts remain accessible"
),
nargs="+",
metavar="TASK",
default=[],
)
wlog.add_logging_parser_arguments(parser_run)
parser_run.set_defaults(func=main_run)
return parser_run
[docs]
def main_run(parser, args):
# Setup the workflow
workflow, logger = setup_workflow(parser, args)
if not workflow:
return 0
# Run the workflow
logger.debug("Run the workflow")
try:
workflow.run(dry=args.dry_run, force=args.force, skip=args.skip)
except Exception as e:
logger.exception(f"Workflow failed: {e.args[0]}")
return 1
else:
logger.info("Successfully ran the workflow!")
return 0
# %% Kill
[docs]
def add_parser_kill(subparsers):
# Setup argument parser
parser_kill = subparsers.add_parser(
"kill",
help="kill one or all workflow jobs",
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
parser_kill.add_argument("jobid", help="job id", nargs="*")
parser_kill.add_argument("--task", help="kill this task only", default=None)
parser_kill.add_argument("--cycle", help="kill this cycle only", default=None)
parser_kill.add_argument(
"--dry-run",
help="run in fake mode for testing purpose",
action="store_true",
)
wlog.add_logging_parser_arguments(parser_kill, default_level="warning")
parser_kill.set_defaults(func=main_kill)
return parser_kill
[docs]
def main_kill(parser, args):
# Setup the workflow
workflow, logger = setup_workflow(parser, args)
if not workflow:
return 0
# Kill
try:
workflow.kill(jobid=args.jobid, task_name=args.task, cycle=args.cycle)
except Exception:
logger.exception("Failed to kill jobs")
return 1
return 0
# %% Clean
[docs]
def add_parser_clean(subparsers):
# Setup argument parser
parser_clean = subparsers.add_parser(
"clean",
help="remove temporary files",
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
parser_clean.add_argument("extra_file", help="extra file or directory to remove", nargs="*")
parser_clean.add_argument(
"--without-submission-dirs",
help="do not remove submission directories",
action="store_true",
)
parser_clean.add_argument("--with-run-dirs", help="remove run directories", action="store_true")
parser_clean.add_argument("--with-log-files", help="remove log files", action="store_true")
parser_clean.add_argument("--with-artifacts", help="remove artifacts", action="store_true")
parser_clean.add_argument(
"--dry-run",
"--test",
help="run in fake mode for testing purpose",
action="store_true",
)
wlog.add_logging_parser_arguments(parser_clean, default_level="info")
parser_clean.set_defaults(func=main_clean)
return parser_clean
[docs]
def main_clean(parser, args):
# Setup the workflow
workflow, logger = setup_workflow(parser, args)
if not workflow:
return 0
# Kill running jobs
if not args.dry_run:
try:
workflow.kill()
except Exception:
logger.exception("Failed to kill all jobs")
return 1
# Show the status
try:
workflow.clean(
submission_dirs=not args.without_submission_dirs,
run_dirs=args.with_run_dirs,
log_files=args.with_log_files,
artifacts=args.with_artifacts,
dry=args.dry_run,
)
except Exception:
logger.exception("Failed to clean workflow")
return 1
return 0
# %% Fill
[docs]
def add_parser_fill(subparsers):
# Setup argument parser
parser_fill = subparsers.add_parser(
"fill",
help="remove temporary files",
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
parser_fill.add_argument("template", help="template file")
parser_fill.add_argument("destination", help="output file")
parser_fill.add_argument(
"--task-name",
help="target task name",
default=os.environ.get("WOOM_TASK_NAME"),
)
parser_fill.add_argument("--cycle", help="target cycle", default=os.environ.get("WOOM_CYCLE"))
parser_fill.add_argument("--member", help="target member", default=os.environ.get("WOOM_MEMBER"))
parser_fill.add_argument(
"--dry-run",
"--test",
help="run in fake mode for testing purpose",
action="store_true",
)
wlog.add_logging_parser_arguments(parser_fill, default_level="info")
parser_fill.set_defaults(func=main_fill)
return parser_fill
[docs]
def main_fill(parser, args):
# Setup the workflow
workflow, logger = setup_workflow(parser, args)
if not workflow:
return 0
# Fill templates
try:
# Set the context
if args.task_name:
workflow.set_context(task_name=args.task_name, cycle=args.cycle, member=args.member)
# Fill
logger.debug(f"Fill template: {args.template} → {args.destination}")
template = wrender.JINJA_ENV.get_template(args.template)
content = wrender.render(template, workflow.context)
# Write destination
destination = wutil.check_dir(args.destination, dry=args.dry_run, logger=logger)
if not args.dry_run:
with open(destination, "w") as f:
f.write(content)
logger.info(f"Filled template: {args.template} → {destination}")
except Exception:
logger.exception("Failed to fill template")
return 1
return 0
# %% Monitor
[docs]
def add_parser_monitor(subparsers):
# Setup argument parser
parser_monitor = subparsers.add_parser(
"monitor",
help="launch a web monitor for the workflow",
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
parser_monitor.add_argument("--port", type=int, default=5000, help="TCP port to listen on")
parser_monitor.add_argument("--bind", default="127.0.0.1", help="IP address to bind to")
parser_monitor.add_argument(
"--no-browser",
action="store_true",
help="do not automatically open a browser tab",
)
wlog.add_logging_parser_arguments(parser_monitor)
parser_monitor.set_defaults(func=main_monitor)
return parser_monitor
[docs]
def main_monitor(parser, args):
# Setup the workflow
workflow, logger = setup_workflow(parser, args)
if not workflow:
return 0
# Launch the web monitor (lazy import so Flask is optional)
from . import monitor as wmonitor
wmonitor.run_monitor(
workflow,
host=args.bind,
port=args.port,
open_browser=not args.no_browser,
)
return 0