Source code for pyjob.script

import enum
import os
import sys

from pyjob.cexec import is_exe
from pyjob.exception import PyJobError
from pyjob.pool import Pool


[docs]@enum.unique class ScriptProperty(enum.Enum): """Enumeration for :obj:`~pyjob.script.Script`-specific properties""" # Tried to extend Enum but operation not allowed in Python3.7 # https://docs.python.org/3/library/enum.html#restricted-subclassing-of-enumerations if sys.platform.startswith("win"): PERL = ("", ".pl") PYTHON = ("", ".py") SHELL = ("", ".bat") else: PERL = ("#!/usr/bin/env perl", ".pl") PYTHON = ("#!/usr/bin/env python", ".py") SHELL = ("#!/bin/bash", ".sh") def __init__(self, shebang, suffix): self.shebang = shebang self.suffix = suffix
EXE_EXT = ".exe" if sys.platform.startswith("win") else "" SCRIPT_HEADER, SCRIPT_EXT = (ScriptProperty.SHELL.shebang, ScriptProperty.SHELL.suffix)
[docs]class ScriptCollector(object): """A :obj:`~pyjob.script.ScriptCollector` to store executable :obj:`~pyjob.script.Script` instances Examples -------- >>> from pyjob.script import ScriptCollector, Script >>> collector = ScriptCollector(None) >>> for _ in range(5): ... collector.add(Script()) """ def __init__(self, scripts): """Instantiate a new :obj:`~pyjob.script.ScriptCollector`""" self._container = [] self._save_script(scripts) def __iter__(self): """Iterator function""" for script in self._container: yield script def __len__(self): """Length function""" return len(self._container) def __repr__(self): """Representation function""" return f"{self.__class__.__qualname__}(nscripts={len(self)})" @property def scripts(self): """The script file paths""" return self._container @scripts.setter def scripts(self, scripts): """The script file paths Parameters ---------- script : :obj:`~pyjob.script.Script`, str, list, tuple Something representing one or more scripts Raises ------ :exc:`~pyjob.exception.PyJobError` Script cannot be found or is not executable """ self._container = [] self._save_script(scripts)
[docs] def add(self, scripts): """Add one or more script file paths Parameters ---------- script : :obj:`~pyjob.script.Script`, str, list, tuple Something representing one or more scripts Raises ------ :exc:`~pyjob.exception.PyJobError` Script cannot be found or is not executable """ self._save_script(scripts)
[docs] def dump(self): """Write all scripts to disk if not already done""" for script in self._container: if not os.path.isfile(script.path): script.write()
def _save_script(self, script): """Helper function to assess/standardise executable input Parameters ---------- script : :obj:`~pyjob.executable.Script`, str, list, tuple Something representing one or more executables Raises ------ :exc:`~pyjob.exception.PyJobError` Unrecognised executable input """ if script is None: return elif isinstance(script, Script): self._container.append(script) elif isinstance(script, str): script = Script.read(script) self._container.append(script) elif isinstance(script, (list, tuple)): for s in script: self._save_script(s) else: raise PyJobError("Unrecognised executable input")
[docs]class Script(list): """Simple extension to :obj:`list` to hold the contents for an executable script Examples -------- >>> from pyjob import Script >>> script = Script(directory='.', prefix='example', stem='', suffix='.sh') >>> script.append('sleep 5') >>> print(script) #!/bin/bash sleep 5 """ def __init__( self, *, shebang=ScriptProperty.SHELL.shebang, directory=".", prefix="tmp", stem="pyjob", suffix=ScriptProperty.SHELL.suffix, ): """Instantiate a new :obj:`~pyjob.script.Script` Parameters ---------- shebang : str, optional The Shebang line in the :obj:`~pyjob.script.Script` directory : str, optional The directory for :obj:`~pyjob.script.Script` storage prefix : str, optional The :obj:`~pyjob.script.Script` filename prefix stem : str, optional The :obj:`~pyjob.script.Script` filename middle suffix : str, optional The :obj:`~pyjob.script.Script` filename suffix """ self.directory = directory self.prefix = prefix self.stem = stem self.suffix = suffix self.shebang = shebang def __add__(self, other): """Combine two :obj:`~pyjob.script.Script` instances""" if self.shebang != other.shebang: raise TypeError("Invalid shebang combination") if self.suffix != other.suffix: raise TypeError("Invalid suffix combination") script = Script( stem=self.stem + "-" + other.stem, shebang=self.shebang, suffix=self.suffix ) script.content = [line for script in [self, other] for line in script] return script def __str__(self): """Content of :obj:`~pyjob.script.Script`""" content = self[:] if len(self.shebang) > 0: content.insert(0, self.shebang) return "\n".join(map(str, content)) @property def content(self): """Getter method for :attr:`~pyjob.script.Script` content""" return self @content.setter def content(self, content): """Setter method for :attr:`~pyjob.script.Script` content""" while len(self) > 0: self.pop() self.extend(content) @property def directory(self): """Getter method for :attr:`~pyjob.script.Script.directory`""" return self._directory @directory.setter def directory(self, directory): """Setter method for :attr:`~pyjob.script.Script.directory`""" self._directory = os.path.abspath(directory) @property def log(self): """Path to the log of the the :obj:`~pyjob.script.Script`""" return self.path.rsplit(".", 1)[0] + ".log" @property def path(self): """Path to the :obj:`~pyjob.script.Script`""" return os.path.join(self.directory, self.prefix + self.stem + self.suffix) @property def suffix(self): """:obj:`~pyjob.script.Script` file suffix""" return self._suffix @suffix.setter def suffix(self, value): """:obj:`~pyjob.script.Script` file suffix""" if value is None or len(value) < 1 or "." not in value: raise ValueError("Script suffix required!") self._suffix = value
[docs] def write(self): """Write the :obj:`~pyjob.script.Script` to :attr:`~pyjob.script.Script.path`""" fname = self.path with open(fname, "w") as f_out: f_out.write(str(self)) os.chmod(fname, 0o777)
[docs] def cleanup(self): """Cleanup :attr:`~pyjob.script.Script.path` and :attr:`~pyjob.script.Script.log` files.""" fnames = [self.path, self.log, self.path.replace(".script", ".jobs")] for fname in fnames: if os.path.isfile(fname): os.remove(fname)
[docs] @staticmethod def read(path): """Read a script file to construct a :obj:`~pyjob.script.Script` Examples -------- >>> from pyjob import read_script >>> script = read_script('./example.sh') >>> print(script) #!/bin/bash sleep 5 Parameters ---------- path : str The path to a script file Returns ------- :obj:`~pyjob.script.Script` A :obj:`~pyjob.script.Script` instance """ directory, fname = os.path.split(path) fname, ext = os.path.splitext(fname) script = Script(directory=directory, prefix="", stem=fname, suffix=ext) with open(path, "r") as f: lines = f.read().splitlines() if len(lines) > 0 and lines[0][:2] == "#!": script.shebang = lines.pop(0) else: script.shebang = "" script.extend(lines) return script
[docs]class LocalScriptCreator(object): """A :obj:`~pyjob.script.ScriptCollector` to store executable :obj:`~pyjob.script.Script` instances created in parallel using an input ``func`` to create the scripts. Examples -------- >>> from pyjob.script import LocalScriptCreator, Script >>> script_creator = LocalScriptCreator(func, iterable, processes) >>> collector = script_creator.collector """ def __init__(self, *, func=None, iterable=None, processes=1): """Instantiate a new :obj:`~pyjob.script.LocalScriptCreator` Parameters ---------- func : func function to create :obj:`~pyjob.script.Script` with custom command iterable : iterable iterable argument to input into func processes : int Number of processes to generate scripts with """ self.func = func self.iterable = iterable self.processes = processes def __call__(self, i): return self.func(i) @property def collector(self): script_collector = ScriptCollector(None) with Pool(processes=self.processes) as pool: script_collector.add(pool.map(self, self.iterable)) return script_collector
[docs]def is_valid_script_path(fname): """Validate a script path Parameters ---------- fname : str The path to a script file Returns ------- bool """ return is_exe(fname)