Module prmf.script_utils

View Source
import os, os.path

import sys

import subprocess as sp

import multiprocessing as mp

import networkx as nx

import distutils.spawn

import hashlib

def args_to_list(args_dict):

  rv = []

  keys = sorted(args_dict.keys())

  for k in keys:

    k_cli = "--" + k.replace('_', '-')

    v = args_dict[k]

    if v is not None:

      if type(v) is list:

        rv.append(k_cli)

        rv = rv + v

      else:

        rv.append(k_cli)

        rv.append(str(v))

  return rv

def add_file_and_dir_args(parser):

  parser.add_argument("--infiles", nargs='+', type=str)

  parser.add_argument("--outfiles", nargs='+', type=str)

  parser.add_argument("--indir", "-i", type=str)

  parser.add_argument("--outdir", "-o", type=str)

def check_file_and_dir_args(args):

  # require file mode or directory mode

  do_file = False

  do_dir = False

  do_infiles_outdir = False

  if args.infiles is not None and args.outfiles is not None:

    do_file = True

    if len(args.infiles) != len(args.outfiles):

      sys.stderr.write("Must provide the same number of infiles and outfiles\n")

      sys.exit(23)

  if not do_file:

    if args.indir is not None and args.outdir is not None:

      do_dir = True

  if not do_file and not do_dir:

    if args.infiles is not None and args.outdir is not None:

      do_infiles_outdir = True

  if not do_file and not do_dir and not do_infiles_outdir:

    sys.stderr.write("Must provide --infiles and --outfiles or provide --indir and --outdir or provide --infiles and --outdir\n")

    sys.exit(22)

  io_pairs = []

  if do_file:

    io_pairs = list(zip(args.infiles, args.outfiles))

  elif do_dir:

    for ifn in os.listdir(args.indir):

      ifp = os.path.join(args.indir, ifn)

      ofp = os.path.join(args.outdir, ifn)

      io_pairs.append((ifp, ofp))

  else:

    # then do_infiles_outdir

    for ifp in args.infiles:

      ofp = os.path.join(args.outdir, os.path.basename(ifp))

      io_pairs.append((ifp, ofp))

  return io_pairs

def run_command(outdir, cmd, *args, **kwargs):

  """Run command and throw error if non-zero exit code

  Keyword Arguments

  -----------------

  condor : boolean

  stdout_fh : io-like

  stderr_fh : io-like

  TODO

  ----

  change interface to match format_vars

  """

  if not 'condor' in kwargs:

    kwargs['condor'] = False

  args = [cmd] + list(args)

  if(kwargs['condor']):

    args = ["condor_submitter.sh"] + args

  stdout_fh = None

  if not 'stdout' in kwargs:

    stdout_fh = open(os.path.join(outdir, "{}.out".format(cmd)), "w")

  else:

    stdout_fh = kwargs['stdout']

  stderr_fh = None

  if not 'stderr' in kwargs:

    stderr_fh = open(os.path.join(outdir, "{}.err".format(cmd)), "w")

  else:

    stderr_fh = kwargs['stderr']

  sys.stdout.write("[STATUS] Launching {}\n".format(str(args)))

  complete_proc = sp.check_call(args, stdout=stdout_fh, stderr=stderr_fh)

def run_command_cp(outdir, cmd, *args, **kwargs):

  """

  "Decorator" around run_command to enable checkpointing

  """

  if 'no_checkpoint' in kwargs:

    kwargs['no_checkpoint'] = True

  else:

    kwargs['no_checkpoint'] = False

  if kwargs['no_checkpoint']:

    run_command(outdir, cmd, *args, **kwargs)

  else:

    # identify this command and its arguments (TODO and its environment?) with a hash value

    to_hash = [cmd] + list(args)

    hash_v = hash("".join(to_hash))

    hash_fp = os.path.join(outdir, str(hash_v))

    # check if command has previously been run successfully

    if os.path.exists(hash_fp):

      # if so, dont run again

      pass

    else:

      run_command(outdir, cmd, *args, **kwargs)

      # if no error from check_call, command successful, write a file indicating this

      with open(hash_fp, 'w') as fh:

        fh.write("\t".join(to_hash))

def get_condor_submit_fp():

  """

  TODO

  ----

  allow a different environment variable to specify the location of this file

  """

  home_dir = os.environ.get("HOME")

  if(home_dir is None):

    raise ValueError("Required environment variable: HOME")

  return os.path.join(home_dir, ".condor", "submitter.sub")

def format_vars(job_id, exe=None, args=[], out=None, err=None, requirements=None, env=None, **kwargs):

  """

  Parameters

  -------

  job_id : str

    Condor DAG job identifier

  exe : str

    executable to run

  args : list of str

    arguments to exe

  out : str

    filepath to write stdout to

  err : str

    filepath to write stderr to

  requirements : str

    Condor-style requirements statement e.g. 'OpSysMajorVer == 7'

  env : str

    conda environment to execute job in

  Returns

  -------

  vars_stmt : str

  """

  VARS_FMT_STR = "VARS {} executable=\"{}\""

  if(exe is None):

    raise ValueError("keyword argument \"exe\" is required")

  exe_path = get_exe_path(exe)

  exe_condor = exe_path # command condor will run

  if(env is not None):

    runner_exe = 'prmf_runner.sh'

    args = [env,  exe_path] + args

    exe_condor = get_exe_path(runner_exe)

  vars_stmt = VARS_FMT_STR.format(job_id, exe_condor, " ".join(args))

  if(len(args) > 0):

    vars_stmt += " arguments=\"{}\"".format(" ".join(args))

  if(out is not None):

    vars_stmt += " output=\"{}\"".format(out)

  if(err is not None):

    vars_stmt += " error=\"{}\"".format(err)

  if(requirements is not None):

    vars_stmt += "requirements = \"{}\"".format(requirements)

  return vars_stmt

def get_exe_path(inpath):

  """

  Wrapper around distutils.spawn.find_executable to provide warning if found executable is in cwd

  """

  exe_path = distutils.spawn.find_executable(inpath)

  if exe_path is None:

    raise ValueError("desired executable {} is not on the PATH nor is it in the current working directory".format(inpath))

  outpath = os.path.abspath(exe_path)

  cwd_exe_path = os.path.abspath(os.path.join(os.curdir, inpath))

  if(outpath == cwd_exe_path):

    # this exe may surprise some users because find_executable behaves differently from "which" in

    # this respect: find_executable will also include the current working directory in the search

    # for the executable

    sys.stderr.write("[warning] found executable {} is in current working directory and may be a different version of {} than found according to the command-line program \"which\"\n".format(outpath, cwd_exe_path))

  return outpath

def job_attrs_to_job_name(exe=None, args=None, out=None, err=None, **kwargs):

  """

  http://research.cs.wisc.edu/htcondor/manual/v8.7/2_10DAGMan_Applications.html#SECTION003102100000000000000

  The JobName can be any string that contains no white space, except for the strings PARENT

  and CHILD (in upper, lower, or mixed case). JobName also cannot contain special characters

  ('.', '+') which are reserved for system use.

  """

  hash_obj = hashlib.sha256()

  hash_obj.update(exe.encode())

  try:

    hash_obj.update(" ".join(args).encode())

  except TypeError as err:

    raise ValueError("Invalid args specification for exe {}:".format(exe, str(err)))

  job_name = hash_obj.hexdigest()

  return job_name

def write_condor_dag(dag_fp, digraph):

  """

  See run_digraph

  """

  JOB_FMT_STR = "JOB {} {}"

  PARENT_FMT_STR = "PARENT {} CHILD {}"

  int_to_chr_offset = ord('A')

  job_int_to_name = {}

  with open(dag_fp, "w") as fh:

    # get generic condor job description filepath

    condor_submit_fp = get_condor_submit_fp()

    job_order = nx.topological_sort(digraph)

    for job_int in job_order:

      # write JOB declaration

      job_attrs = digraph.node[job_int]

      job_name = job_attrs_to_job_name(**job_attrs)

      job_int_to_name[job_int] = job_name

      fh.write(JOB_FMT_STR.format(job_name, condor_submit_fp) + "\n")

      # write VARS declaration

      fh.write(format_vars(job_name, **job_attrs) + "\n")

    for edge in digraph.edges_iter():

      # write PARENT .. CHILD declaration

      fh.write(PARENT_FMT_STR.format(job_int_to_name[edge[0]], job_int_to_name[edge[1]]) + "\n")

def submit_condor_dag(dag_fp):

  # TODO output files go in cwd or location of dag file

  args = ["condor_submit_dag", dag_fp]

  sp.check_call(args)

def bfs_nodes(G, source):

  node_list = []

  edge_list = list(nx.bfs_edges(G, source))

  node_list.append(edge_list[0][0])

  for edge in edge_list:

    node_list.append(edge[1])

  return node_list

def run_digraph(outdir, digraph, condor=False, dry_run=False, root_node=0, exit_on_err=True, **kwargs):

  """

  Run a set of jobs specified by a directed (acyclic) graph

  Parameters

  ----------

  digraph : nx.DiGraph

    directed graph specifying job execution order; nodes in the graph are assumed to have node

    attribute 'args' which specifies the executable in args[0] and its arguments in args[1:] and node identifiers

    in [0..n]

  condor : bool

    if True, use condor_submitter.sh to submit jobs

  dry_run : bool

    if True, do not submit the DAG

  exit_on_err : bool

    if True (and condor False), stop execution of digraph when one of the job nodes fails

  Returns : TODO

  -------

  job_ids : list of str

  """

  job_ids = []

  if(condor):

    # then create a DAG description file for Condor

    dag_fp = os.path.join(outdir, "digraph.dag")

    # TODO job_ids?

    write_condor_dag(dag_fp, digraph)

    if(not dry_run):

      submit_condor_dag(dag_fp)

  else:

    # TODO even with the pool it appears only 1 process is running

    pool = mp.Pool(processes=mp.cpu_count()-1)

    digraph = digraph.copy() # add proc node attr pointing to a Popen object

    #job_order = bfs_nodes(digraph, root_node)

    job_order = nx.topological_sort(digraph)

    env_warned = False

    for job_id in job_order:

      job_attrs = digraph.node[job_id]

      if 'env' in job_attrs and not env_warned:

        sys.stderr.write("[WARNING] one or more jobs have an environment specified, but this functionality has only been implemented for condor\n")

        env_warned = True

      # wait for any predecessors to finish

      #preds = digraph.predecessors(job_id)

      #for pred in preds:

      #  digraph.node[pred]['async_result'].wait()

      #sum_v = 0

      #for exit in pred_exits:

      #  sum_v += exit

      #if(sum_v > 0):

      #  # then a predecessor failed

      #  raise RuntimeError("[ERROR] a predecessor to {} failed".format(digraph.node[job_id]['exe']))

      # launch this node's job

      args = [job_attrs['exe']] + job_attrs['args']

      stdout_fh = None

      if('out' in job_attrs):

        stdout_fh = open(job_attrs['out'], 'w')

      else:

        stdout_fh = sys.stdout

      stderr_fh = None

      if('err' in job_attrs):

        stderr_fh = open(job_attrs['err'], 'w')

      else:

        stderr_fh = sys.stderr

      sys.stdout.write("[STATUS] Launching {} > {} 2> {}\n".format(" ".join([digraph.node[job_id]['exe']] + digraph.node[job_id]['args']), job_attrs['out'], job_attrs['err']))

      #def callback_for_job(exit_status):

      #  digraph.node[job_id]['exit'] = exit_status

      #digraph.node[job_id]['exit'] = None

      # TODO for some reason providing stdout and stderr breaks everything?

      #async_result = pool.apply_async(sp.check_call, [args]) #, {'stdout': stdout_fh, 'stderr': stderr_fh})

      #digraph.node[job_id]['async_result'] = async_result

      #async_result.wait()

      # TODO synchronous only

      #proc = sp.Popen(args, stdout=stdout_fh, stderr=stderr_fh)

      if(not dry_run):

        exit_code = -1

        if exit_on_err:

          exit_code = sp.check_call(args, stdout=stdout_fh, stderr=stderr_fh)

        else:

          exit_code = sp.call(args, stdout=stdout_fh, stderr=stderr_fh)

        print(exit_code)

  return job_ids

def parse_condor_submit_stdout(stdout):

  """

  Returns

  -------

  job_id : str

    job identifier from condor_submit stdout

  TODO

  ----

  are there python bindings for Condor that are more stable than parsing stdout?

  """

  pass

def get_revision_number():

  """

  Get revision number from git

  Raises

  ------

  CalledProcessError

    if git command fails to find the revision number this function will propagate the error:

    we do not handle this exception because reproducibility is too often overlooked

  ValueError

    if environment variable CS799_REPO_DIR is not set

  """

  env_var = 'PRMF_REPO_DIR'

  repo_dir = os.environ.get(env_var)

  if repo_dir is None:

    raise ValueError("Missing environment variable {}".format(env_var))

  rev_number = sp.check_output(["get_revision_no.sh"])

  return rev_number.rstrip()

def log_script(argv):

  """

  Parameters

  ----------

  argv : list of str

    return value of sys.argv

  """

  sys.stderr.write(" ".join(argv) + "\n")

  #sys.stderr.write("Revision: {}\n".format(get_revision_number()))

def mkdir_p(dirpath):

  try:

    os.makedirs(dirpath)

  except OSError as e:

    pass

def str2bool(v):

  if isinstance(v, bool):

   return v

  if v.lower() in ('yes', 'true', 't', 'y', '1'):

    return True

  elif v.lower() in ('no', 'false', 'f', 'n', '0'):

    return False

  else:

    raise argparse.ArgumentTypeError('Boolean value expected.')

Functions

add_file_and_dir_args

def add_file_and_dir_args(
    parser
)
View Source
def add_file_and_dir_args(parser):

  parser.add_argument("--infiles", nargs='+', type=str)

  parser.add_argument("--outfiles", nargs='+', type=str)

  parser.add_argument("--indir", "-i", type=str)

  parser.add_argument("--outdir", "-o", type=str)

args_to_list

def args_to_list(
    args_dict
)
View Source
def args_to_list(args_dict):

  rv = []

  keys = sorted(args_dict.keys())

  for k in keys:

    k_cli = "--" + k.replace('_', '-')

    v = args_dict[k]

    if v is not None:

      if type(v) is list:

        rv.append(k_cli)

        rv = rv + v

      else:

        rv.append(k_cli)

        rv.append(str(v))

  return rv

bfs_nodes

def bfs_nodes(
    G,
    source
)
View Source
def bfs_nodes(G, source):

  node_list = []

  edge_list = list(nx.bfs_edges(G, source))

  node_list.append(edge_list[0][0])

  for edge in edge_list:

    node_list.append(edge[1])

  return node_list

check_file_and_dir_args

def check_file_and_dir_args(
    args
)
View Source
def check_file_and_dir_args(args):

  # require file mode or directory mode

  do_file = False

  do_dir = False

  do_infiles_outdir = False

  if args.infiles is not None and args.outfiles is not None:

    do_file = True

    if len(args.infiles) != len(args.outfiles):

      sys.stderr.write("Must provide the same number of infiles and outfiles\n")

      sys.exit(23)

  if not do_file:

    if args.indir is not None and args.outdir is not None:

      do_dir = True

  if not do_file and not do_dir:

    if args.infiles is not None and args.outdir is not None:

      do_infiles_outdir = True

  if not do_file and not do_dir and not do_infiles_outdir:

    sys.stderr.write("Must provide --infiles and --outfiles or provide --indir and --outdir or provide --infiles and --outdir\n")

    sys.exit(22)

  io_pairs = []

  if do_file:

    io_pairs = list(zip(args.infiles, args.outfiles))

  elif do_dir:

    for ifn in os.listdir(args.indir):

      ifp = os.path.join(args.indir, ifn)

      ofp = os.path.join(args.outdir, ifn)

      io_pairs.append((ifp, ofp))

  else:

    # then do_infiles_outdir

    for ifp in args.infiles:

      ofp = os.path.join(args.outdir, os.path.basename(ifp))

      io_pairs.append((ifp, ofp))

  return io_pairs

format_vars

def format_vars(
    job_id,
    exe=None,
    args=[],
    out=None,
    err=None,
    requirements=None,
    env=None,
    **kwargs
)

Parameters

job_id : str Condor DAG job identifier

exe : str executable to run

args : list of str arguments to exe

out : str filepath to write stdout to

err : str filepath to write stderr to

requirements : str Condor-style requirements statement e.g. 'OpSysMajorVer == 7'

env : str conda environment to execute job in

Returns

vars_stmt : str

View Source
def format_vars(job_id, exe=None, args=[], out=None, err=None, requirements=None, env=None, **kwargs):

  """

  Parameters

  -------

  job_id : str

    Condor DAG job identifier

  exe : str

    executable to run

  args : list of str

    arguments to exe

  out : str

    filepath to write stdout to

  err : str

    filepath to write stderr to

  requirements : str

    Condor-style requirements statement e.g. 'OpSysMajorVer == 7'

  env : str

    conda environment to execute job in

  Returns

  -------

  vars_stmt : str

  """

  VARS_FMT_STR = "VARS {} executable=\"{}\""

  if(exe is None):

    raise ValueError("keyword argument \"exe\" is required")

  exe_path = get_exe_path(exe)

  exe_condor = exe_path # command condor will run

  if(env is not None):

    runner_exe = 'prmf_runner.sh'

    args = [env,  exe_path] + args

    exe_condor = get_exe_path(runner_exe)

  vars_stmt = VARS_FMT_STR.format(job_id, exe_condor, " ".join(args))

  if(len(args) > 0):

    vars_stmt += " arguments=\"{}\"".format(" ".join(args))

  if(out is not None):

    vars_stmt += " output=\"{}\"".format(out)

  if(err is not None):

    vars_stmt += " error=\"{}\"".format(err)

  if(requirements is not None):

    vars_stmt += "requirements = \"{}\"".format(requirements)

  return vars_stmt

get_condor_submit_fp

def get_condor_submit_fp(

)

TODO

allow a different environment variable to specify the location of this file

View Source
def get_condor_submit_fp():

  """

  TODO

  ----

  allow a different environment variable to specify the location of this file

  """

  home_dir = os.environ.get("HOME")

  if(home_dir is None):

    raise ValueError("Required environment variable: HOME")

  return os.path.join(home_dir, ".condor", "submitter.sub")

get_exe_path

def get_exe_path(
    inpath
)

Wrapper around distutils.spawn.find_executable to provide warning if found executable is in cwd

View Source
def get_exe_path(inpath):

  """

  Wrapper around distutils.spawn.find_executable to provide warning if found executable is in cwd

  """

  exe_path = distutils.spawn.find_executable(inpath)

  if exe_path is None:

    raise ValueError("desired executable {} is not on the PATH nor is it in the current working directory".format(inpath))

  outpath = os.path.abspath(exe_path)

  cwd_exe_path = os.path.abspath(os.path.join(os.curdir, inpath))

  if(outpath == cwd_exe_path):

    # this exe may surprise some users because find_executable behaves differently from "which" in

    # this respect: find_executable will also include the current working directory in the search

    # for the executable

    sys.stderr.write("[warning] found executable {} is in current working directory and may be a different version of {} than found according to the command-line program \"which\"\n".format(outpath, cwd_exe_path))

  return outpath

get_revision_number

def get_revision_number(

)

Get revision number from git

Raises

CalledProcessError if git command fails to find the revision number this function will propagate the error: we do not handle this exception because reproducibility is too often overlooked

ValueError if environment variable CS799_REPO_DIR is not set

View Source
def get_revision_number():

  """

  Get revision number from git

  Raises

  ------

  CalledProcessError

    if git command fails to find the revision number this function will propagate the error:

    we do not handle this exception because reproducibility is too often overlooked

  ValueError

    if environment variable CS799_REPO_DIR is not set

  """

  env_var = 'PRMF_REPO_DIR'

  repo_dir = os.environ.get(env_var)

  if repo_dir is None:

    raise ValueError("Missing environment variable {}".format(env_var))

  rev_number = sp.check_output(["get_revision_no.sh"])

  return rev_number.rstrip()

job_attrs_to_job_name

def job_attrs_to_job_name(
    exe=None,
    args=None,
    out=None,
    err=None,
    **kwargs
)

http://research.cs.wisc.edu/htcondor/manual/v8.7/2_10DAGMan_Applications.html#SECTION003102100000000000000 The JobName can be any string that contains no white space, except for the strings PARENT and CHILD (in upper, lower, or mixed case). JobName also cannot contain special characters ('.', '+') which are reserved for system use.

View Source
def job_attrs_to_job_name(exe=None, args=None, out=None, err=None, **kwargs):

  """

  http://research.cs.wisc.edu/htcondor/manual/v8.7/2_10DAGMan_Applications.html#SECTION003102100000000000000

  The JobName can be any string that contains no white space, except for the strings PARENT

  and CHILD (in upper, lower, or mixed case). JobName also cannot contain special characters

  ('.', '+') which are reserved for system use.

  """

  hash_obj = hashlib.sha256()

  hash_obj.update(exe.encode())

  try:

    hash_obj.update(" ".join(args).encode())

  except TypeError as err:

    raise ValueError("Invalid args specification for exe {}:".format(exe, str(err)))

  job_name = hash_obj.hexdigest()

  return job_name

log_script

def log_script(
    argv
)

Parameters

argv : list of str return value of sys.argv

View Source
def log_script(argv):

  """

  Parameters

  ----------

  argv : list of str

    return value of sys.argv

  """

  sys.stderr.write(" ".join(argv) + "\n")

mkdir_p

def mkdir_p(
    dirpath
)
View Source
def mkdir_p(dirpath):

  try:

    os.makedirs(dirpath)

  except OSError as e:

    pass

parse_condor_submit_stdout

def parse_condor_submit_stdout(
    stdout
)

Returns

job_id : str job identifier from condor_submit stdout

TODO

are there python bindings for Condor that are more stable than parsing stdout?

View Source
def parse_condor_submit_stdout(stdout):

  """

  Returns

  -------

  job_id : str

    job identifier from condor_submit stdout

  TODO

  ----

  are there python bindings for Condor that are more stable than parsing stdout?

  """

  pass

run_command

def run_command(
    outdir,
    cmd,
    *args,
    **kwargs
)

Run command and throw error if non-zero exit code

Keyword Arguments

condor : boolean stdout_fh : io-like stderr_fh : io-like

TODO

change interface to match format_vars

View Source
def run_command(outdir, cmd, *args, **kwargs):

  """Run command and throw error if non-zero exit code

  Keyword Arguments

  -----------------

  condor : boolean

  stdout_fh : io-like

  stderr_fh : io-like

  TODO

  ----

  change interface to match format_vars

  """

  if not 'condor' in kwargs:

    kwargs['condor'] = False

  args = [cmd] + list(args)

  if(kwargs['condor']):

    args = ["condor_submitter.sh"] + args

  stdout_fh = None

  if not 'stdout' in kwargs:

    stdout_fh = open(os.path.join(outdir, "{}.out".format(cmd)), "w")

  else:

    stdout_fh = kwargs['stdout']

  stderr_fh = None

  if not 'stderr' in kwargs:

    stderr_fh = open(os.path.join(outdir, "{}.err".format(cmd)), "w")

  else:

    stderr_fh = kwargs['stderr']

  sys.stdout.write("[STATUS] Launching {}\n".format(str(args)))

  complete_proc = sp.check_call(args, stdout=stdout_fh, stderr=stderr_fh)

run_command_cp

def run_command_cp(
    outdir,
    cmd,
    *args,
    **kwargs
)

"Decorator" around run_command to enable checkpointing

View Source
def run_command_cp(outdir, cmd, *args, **kwargs):

  """

  "Decorator" around run_command to enable checkpointing

  """

  if 'no_checkpoint' in kwargs:

    kwargs['no_checkpoint'] = True

  else:

    kwargs['no_checkpoint'] = False

  if kwargs['no_checkpoint']:

    run_command(outdir, cmd, *args, **kwargs)

  else:

    # identify this command and its arguments (TODO and its environment?) with a hash value

    to_hash = [cmd] + list(args)

    hash_v = hash("".join(to_hash))

    hash_fp = os.path.join(outdir, str(hash_v))

    # check if command has previously been run successfully

    if os.path.exists(hash_fp):

      # if so, dont run again

      pass

    else:

      run_command(outdir, cmd, *args, **kwargs)

      # if no error from check_call, command successful, write a file indicating this

      with open(hash_fp, 'w') as fh:

        fh.write("\t".join(to_hash))

run_digraph

def run_digraph(
    outdir,
    digraph,
    condor=False,
    dry_run=False,
    root_node=0,
    exit_on_err=True,
    **kwargs
)

Run a set of jobs specified by a directed (acyclic) graph

Parameters

digraph : nx.DiGraph directed graph specifying job execution order; nodes in the graph are assumed to have node attribute 'args' which specifies the executable in args[0] and its arguments in args[1:] and node identifiers in [0..n]

condor : bool if True, use condor_submitter.sh to submit jobs

dry_run : bool if True, do not submit the DAG

exit_on_err : bool if True (and condor False), stop execution of digraph when one of the job nodes fails

Returns : TODO

job_ids : list of str

View Source
def run_digraph(outdir, digraph, condor=False, dry_run=False, root_node=0, exit_on_err=True, **kwargs):

  """

  Run a set of jobs specified by a directed (acyclic) graph

  Parameters

  ----------

  digraph : nx.DiGraph

    directed graph specifying job execution order; nodes in the graph are assumed to have node

    attribute 'args' which specifies the executable in args[0] and its arguments in args[1:] and node identifiers

    in [0..n]

  condor : bool

    if True, use condor_submitter.sh to submit jobs

  dry_run : bool

    if True, do not submit the DAG

  exit_on_err : bool

    if True (and condor False), stop execution of digraph when one of the job nodes fails

  Returns : TODO

  -------

  job_ids : list of str

  """

  job_ids = []

  if(condor):

    # then create a DAG description file for Condor

    dag_fp = os.path.join(outdir, "digraph.dag")

    # TODO job_ids?

    write_condor_dag(dag_fp, digraph)

    if(not dry_run):

      submit_condor_dag(dag_fp)

  else:

    # TODO even with the pool it appears only 1 process is running

    pool = mp.Pool(processes=mp.cpu_count()-1)

    digraph = digraph.copy() # add proc node attr pointing to a Popen object

    #job_order = bfs_nodes(digraph, root_node)

    job_order = nx.topological_sort(digraph)

    env_warned = False

    for job_id in job_order:

      job_attrs = digraph.node[job_id]

      if 'env' in job_attrs and not env_warned:

        sys.stderr.write("[WARNING] one or more jobs have an environment specified, but this functionality has only been implemented for condor\n")

        env_warned = True

      # wait for any predecessors to finish

      #preds = digraph.predecessors(job_id)

      #for pred in preds:

      #  digraph.node[pred]['async_result'].wait()

      #sum_v = 0

      #for exit in pred_exits:

      #  sum_v += exit

      #if(sum_v > 0):

      #  # then a predecessor failed

      #  raise RuntimeError("[ERROR] a predecessor to {} failed".format(digraph.node[job_id]['exe']))

      # launch this node's job

      args = [job_attrs['exe']] + job_attrs['args']

      stdout_fh = None

      if('out' in job_attrs):

        stdout_fh = open(job_attrs['out'], 'w')

      else:

        stdout_fh = sys.stdout

      stderr_fh = None

      if('err' in job_attrs):

        stderr_fh = open(job_attrs['err'], 'w')

      else:

        stderr_fh = sys.stderr

      sys.stdout.write("[STATUS] Launching {} > {} 2> {}\n".format(" ".join([digraph.node[job_id]['exe']] + digraph.node[job_id]['args']), job_attrs['out'], job_attrs['err']))

      #def callback_for_job(exit_status):

      #  digraph.node[job_id]['exit'] = exit_status

      #digraph.node[job_id]['exit'] = None

      # TODO for some reason providing stdout and stderr breaks everything?

      #async_result = pool.apply_async(sp.check_call, [args]) #, {'stdout': stdout_fh, 'stderr': stderr_fh})

      #digraph.node[job_id]['async_result'] = async_result

      #async_result.wait()

      # TODO synchronous only

      #proc = sp.Popen(args, stdout=stdout_fh, stderr=stderr_fh)

      if(not dry_run):

        exit_code = -1

        if exit_on_err:

          exit_code = sp.check_call(args, stdout=stdout_fh, stderr=stderr_fh)

        else:

          exit_code = sp.call(args, stdout=stdout_fh, stderr=stderr_fh)

        print(exit_code)

  return job_ids

str2bool

def str2bool(
    v
)
View Source
def str2bool(v):

  if isinstance(v, bool):

   return v

  if v.lower() in ('yes', 'true', 't', 'y', '1'):

    return True

  elif v.lower() in ('no', 'false', 'f', 'n', '0'):

    return False

  else:

    raise argparse.ArgumentTypeError('Boolean value expected.')

submit_condor_dag

def submit_condor_dag(
    dag_fp
)
View Source
def submit_condor_dag(dag_fp):

  # TODO output files go in cwd or location of dag file

  args = ["condor_submit_dag", dag_fp]

  sp.check_call(args)

write_condor_dag

def write_condor_dag(
    dag_fp,
    digraph
)

See run_digraph

View Source
def write_condor_dag(dag_fp, digraph):

  """

  See run_digraph

  """

  JOB_FMT_STR = "JOB {} {}"

  PARENT_FMT_STR = "PARENT {} CHILD {}"

  int_to_chr_offset = ord('A')

  job_int_to_name = {}

  with open(dag_fp, "w") as fh:

    # get generic condor job description filepath

    condor_submit_fp = get_condor_submit_fp()

    job_order = nx.topological_sort(digraph)

    for job_int in job_order:

      # write JOB declaration

      job_attrs = digraph.node[job_int]

      job_name = job_attrs_to_job_name(**job_attrs)

      job_int_to_name[job_int] = job_name

      fh.write(JOB_FMT_STR.format(job_name, condor_submit_fp) + "\n")

      # write VARS declaration

      fh.write(format_vars(job_name, **job_attrs) + "\n")

    for edge in digraph.edges_iter():

      # write PARENT .. CHILD declaration

      fh.write(PARENT_FMT_STR.format(job_int_to_name[edge[0]], job_int_to_name[edge[1]]) + "\n")