Source code for pyjob.script

# MIT License
#
# Copyright (c) 2017-18 Felix Simkovic
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.

__author__ = 'Felix Simkovic'
__contributors__ = ['Adam Simpkin']
__version__ = '1.0'

import enum
import os
import sys

from pyjob.cexec import is_exe
from pyjob.exception import PyJobError
from pyjob.pool import Pool


[docs]@enum.unique class ScriptProperty(enum.Enum): """Enumeration for :obj:`~pyjob.script.Script`-specific properties""" # Tried to extend Enum but operation not allowed in Python3.7 # https://docs.python.org/3/library/enum.html#restricted-subclassing-of-enumerations if sys.platform.startswith('win'): PERL = ('', '.pl') PYTHON = ('', '.py') SHELL = ('', '.bat') else: PERL = ('#!/usr/bin/env perl', '.pl') PYTHON = ('#!/usr/bin/env python', '.py') SHELL = ('#!/bin/bash', '.sh') def __init__(self, shebang, suffix): self.shebang = shebang self.suffix = suffix
if sys.platform.startswith('win'): EXE_EXT = '.exe' else: EXE_EXT = '' SCRIPT_HEADER, SCRIPT_EXT = (ScriptProperty.SHELL.shebang, ScriptProperty.SHELL.suffix)
[docs]class ScriptCollector(object): """A :obj:`~pyjob.script.ScriptCollector` to store executable :obj:`~pyjob.script.Script` instances Examples -------- >>> from pyjob.script import ScriptCollector, Script >>> collector = ScriptCollector(None) >>> for _ in range(5): ... collector.add(Script()) """ def __init__(self, scripts): """Instantiate a new :obj:`~pyjob.script.ScriptCollector`""" self._container = [] self._save_script(scripts) def __iter__(self): """Iterator function""" for script in self._container: yield script def __len__(self): """Length function""" return len(self._container) def __repr__(self): """Representation function""" return '{}(nscripts={})'.format(self.__class__.__name__, len(self)) @property def scripts(self): """The script file paths""" return self._container @scripts.setter def scripts(self, scripts): """The script file paths Parameters ---------- script : :obj:`~pyjob.script.Script`, str, list, tuple Something representing one or more scripts Raises ------ :exc:`~pyjob.exception.PyJobError` Script cannot be found or is not executable """ self._container = [] self._save_script(scripts)
[docs] def add(self, scripts): """Add one or more script file paths Parameters ---------- script : :obj:`~pyjob.script.Script`, str, list, tuple Something representing one or more scripts Raises ------ :exc:`~pyjob.exception.PyJobError` Script cannot be found or is not executable """ self._save_script(scripts)
[docs] def dump(self): """Write all scripts to disk if not already done""" for script in self._container: if not os.path.isfile(script.path): script.write()
def _save_script(self, script): """Helper function to assess/standardise executable input Parameters ---------- script : :obj:`~pyjob.executable.Script`, str, list, tuple Something representing one or more executables Raises ------ :exc:`~pyjob.exception.PyJobError` Unrecognised executable input """ if script is None: return elif isinstance(script, Script): self._container.append(script) elif isinstance(script, str): script = Script.read(script) self._container.append(script) elif isinstance(script, (list, tuple)): for s in script: self._save_script(s) else: raise PyJobError('Unrecognised executable input')
[docs]class Script(list): """Simple extension to :obj:`list` to hold the contents for an executable script Examples -------- >>> from pyjob import Script >>> script = Script(directory='.', prefix='example', stem='', suffix='.sh') >>> script.append('sleep 5') >>> print(script) #!/bin/bash sleep 5 """ def __init__( self, shebang=ScriptProperty.SHELL.shebang, directory='.', prefix='tmp', stem='pyjob', suffix=ScriptProperty.SHELL.suffix, ): """Instantiate a new :obj:`~pyjob.script.Script` Parameters ---------- shebang : str, optional The Shebang line in the :obj:`~pyjob.script.Script` directory : str, optional The directory for :obj:`~pyjob.script.Script` storage prefix : str, optional The :obj:`~pyjob.script.Script` filename prefix stem : str, optional The :obj:`~pyjob.script.Script` filename middle suffix : str, optional The :obj:`~pyjob.script.Script` filename suffix """ self.directory = directory self.prefix = prefix self.stem = stem self.suffix = suffix self.shebang = shebang def __add__(self, other): """Combine two :obj:`~pyjob.script.Script` instances""" if self.shebang != other.shebang: raise TypeError('Invalid shebang combination') if self.suffix != other.suffix: raise TypeError('Invalid suffix combination') script = Script(stem=self.stem + '-' + other.stem, shebang=self.shebang, suffix=self.suffix) script.content = [line for script in [self, other] for line in script] return script def __str__(self): """Content of :obj:`~pyjob.script.Script`""" content = self[:] if len(self.shebang) > 0: content.insert(0, self.shebang) return '\n'.join(map(str, content)) @property def content(self): """Getter method for :attr:`~pyjob.script.Script` content""" return self @content.setter def content(self, content): """Setter method for :attr:`~pyjob.script.Script` content""" while len(self) > 0: self.pop() self.extend(content) @property def directory(self): """Getter method for :attr:`~pyjob.script.Script.directory`""" return self._directory @directory.setter def directory(self, directory): """Setter method for :attr:`~pyjob.script.Script.directory`""" self._directory = os.path.abspath(directory) @property def log(self): """Path to the log of the the :obj:`~pyjob.script.Script`""" return self.path.rsplit('.', 1)[0] + '.log' @property def path(self): """Path to the :obj:`~pyjob.script.Script`""" return os.path.join(self.directory, self.prefix + self.stem + self.suffix) @property def shebang(self): """:obj:`~pyjob.script.Script` shebang""" return self._shebang @shebang.setter def shebang(self, value): """:obj:`~pyjob.script.Script` shebang""" self._shebang = value @property def suffix(self): """:obj:`~pyjob.script.Script` file suffix""" return self._suffix @suffix.setter def suffix(self, value): """:obj:`~pyjob.script.Script` file suffix""" if value is None or len(value) < 1 or '.' not in value: raise ValueError('Script suffix required!') self._suffix = value
[docs] def write(self): """Write the :obj:`~pyjob.script.Script` to :attr:`~pyjob.script.Script.path`""" fname = self.path with open(fname, 'w') as f_out: f_out.write(str(self)) os.chmod(fname, 0o777)
[docs] def cleanup(self): """Cleanup :attr:`~pyjob.script.Script.path` and :attr:`~pyjob.script.Script.log` files if they exist""" if os.path.isfile(self.path): os.remove(self.path) if os.path.isfile(self.log): os.remove(self.log) # This bit is for cluster tasks if os.path.isfile(self.path.replace('.script', '.jobs')): os.remove(self.path.replace('.script', '.jobs'))
[docs] @staticmethod def read(path): """Read a script file to construct a :obj:`~pyjob.script.Script` Examples -------- >>> from pyjob import read_script >>> script = read_script('./example.sh') >>> print(script) #!/bin/bash sleep 5 Parameters ---------- path : str The path to a script file Returns ------- :obj:`~pyjob.script.Script` A :obj:`~pyjob.script.Script` instance """ directory, fname = os.path.split(path) fname, ext = os.path.splitext(fname) script = Script(directory=directory, prefix='', stem=fname, suffix=ext) with open(path, 'r') as f: lines = f.read().splitlines() if len(lines) > 0 and lines[0][:2] == '#!': script.shebang = lines.pop(0) else: script.shebang = '' script.extend(lines) return script
[docs]class LocalScriptCreator(object): """A :obj:`~pyjob.script.ScriptCollector` to store executable :obj:`~pyjob.script.Script` instances created in parallel using an input ``func`` to create the scripts. Examples -------- >>> from pyjob.script import LocalScriptCreator, Script >>> script_creator = LocalScriptCreator(func, iterable, processes) >>> collector = script_creator.collector """ def __init__(self, func=None, iterable=None, processes=1): """Instantiate a new :obj:`~pyjob.script.LocalScriptCreator` Parameters ---------- func : func function to create :obj:`~pyjob.script.Script` with custom command iterable : iterable iterable argument to input into func processes : int Number of processes to generate scripts with """ self.func = func self.iterable = iterable self.processes = processes def __call__(self, i): return self.func(i) @property def collector(self): script_collector = ScriptCollector(None) with Pool(processes=self.processes) as pool: script_collector.add(pool.map(self, self.iterable)) return script_collector
[docs]def is_valid_script_path(fname): """Validate a script path Parameters ---------- fname : str The path to a script file Returns ------- bool """ return is_exe(fname)