Source code for pyjob.task

# MIT License
#
# Copyright (c) 2017-18 Felix Simkovic
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.

__author__ = 'Felix Simkovic'
__version__ = '1.0'

import abc
import logging
import os
import time

from pyjob import cexec, config
from pyjob.exception import PyJobError, PyJobExecutableNotFoundError, PyJobTaskLockedError
from pyjob.script import ScriptCollector

ABC = abc.ABCMeta('ABC', (object,), {})
logger = logging.getLogger(__name__)


[docs]class Task(ABC): """Abstract base class for executable tasks""" def __init__(self, script, *args, **kwargs): """Instantiate a new :obj:`~pyjob.task.Task` Parameters ---------- script : :obj:`~pyjob.script.ScriptCollector`, :obj:`~pyjob.script.Script`, str, list, tuple A :obj:`str`, :obj:`list` or :obj:`tuple` of one or more script paths """ self.pid = None self.locked = False if isinstance(script, ScriptCollector): self.script_collector = script else: self.script_collector = ScriptCollector(script) # These arguments are universal to all Task entities self.directory = os.path.abspath(kwargs.get('directory') or config.get('directory') or '.') self.nprocesses = kwargs.get('processes') or config.get('processes') or 1 def __del__(self): """Exit function at instance deletion""" if not self.locked: self.lock() self.close() def __enter__(self): """Contextmanager entry function Note ---- For further details see `PEP 343 <https://www.python.org/dev/peps/pep-0343/>`_. """ return self def __exit__(self, *exc): """Contextmanager exit function Note ---- For further details see `PEP 343 <https://www.python.org/dev/peps/pep-0343/>`_. """ if not self.locked: self.lock() self.close() def __repr__(self): """Representation of the :obj:`~pyjob.task.Task`""" return '{}(pid={})'.format(self.__class__.__name__, self.pid) # ------------------ Abstract methods and properties ------------------ @abc.abstractproperty def info(self): # pragma: no cover """Abstract property to provide info about the :obj:`~pyjob.task.Task`""" pass
[docs] @abc.abstractmethod def close(self): # pragma: no cover """Abstract method to end :obj:`~pyjob.task.Task`""" pass
[docs] @abc.abstractmethod def kill(self): # pragma: no cover """Abstract method to forcefully terminate :obj:`~pyjob.task.Task`""" pass
@abc.abstractmethod def _run(self): # pragma: no cover """Abstract property to start execution of the :obj:`~pyjob.task.Task`""" pass # ------------------ Other task-specific general methods ------------------ @property def completed(self): """Boolean to indicate :obj:`~pyjob.task.Task` completion""" return self.locked and not bool(self.info) @property def log(self): """The log file path""" return [script.log for script in self.script_collector] @property def script(self): """The script file path""" return [script.path for script in self.script_collector]
[docs] @staticmethod def get_time(minutes): """Return runtime string with format hh:mm:ss to be used in :obj:`~pyjob.task.Task` Parameters ---------- minutes : int Integer with the number of minutes to allocate to runtime Raises ------ :exc:`~pyjob.exception.PyJobError` Argument is not a positive integer """ if isinstance(minutes, int) and minutes > 0: h, m = divmod(minutes, 60) return '{0:02d}:{1:02d}:00'.format(h, m) else: raise PyJobError('Task runtime has to be a positive integer!')
[docs] def add_script(self, script): """Add further scripts to this :obj:`~pyjob.task.Task` Parameters ---------- script : :obj:`~pyjob.script.Script`, str, list, tuple Something representing one or more scripts """ if self.locked: raise PyJobTaskLockedError('This task is locked!') self.script_collector.add(script)
[docs] def lock(self): """Lock this :obj:`~pyjob.task.Task`""" self.locked = True logger.debug('Locked %s [%d]', self.__class__.__name__, self.pid)
[docs] def run(self): """Start the execution of this :obj:`~pyjob.task.Task` Raises ------ :exc:`~pyjob.exception.PyJobError` One or more executable scripts required prior to execution :exc:`~pyjob.exception.PyJobTaskLockedError` Locked task, cannot restart or rerun """ if self.locked: raise PyJobTaskLockedError('This task is locked!') if len(self.script_collector) < 1: raise PyJobError('One or more executable scripts required prior to execution') self.script_collector.dump() self._run() logger.debug('Started execution of %s [%d]', self.__class__.__name__, self.pid) self.lock()
[docs] def wait(self, interval=30, monitor_f=None, success_f=None): """Method to wait for the completion of the current :obj:`~pyjob.task.Task` Parameters ---------- interval : int, optional The interval to wait between checking (in seconds) monitor_f : callable, optional A :obj:`callable` that is regularly invoked success_f : callable, optional A :obj:`callable` to check for early termination of :obj:`~pyjob.task.Task` Note ---- The `success_f` argument needs to accept a log file as input and return a :obj:`bool`. """ def is_successful_run(log): return os.path.isfile(log) and success_f(log) def is_callable_fn(fn): return bool(fn and callable(fn)) check_success = is_callable_fn(success_f) callback = monitor_f if is_callable_fn(monitor_f) else lambda: None if check_success: msg = 'Checking for %s %d success with function %s' logger.debug(msg, self.__class__.__name__, self.pid, success_f.__name__) while not self.completed: if check_success: for log in self.log: if is_successful_run(log): logger.debug("%s %d succeeded, run log: %s", self.__class__.__name__, self.pid, log) self.kill() callback() time.sleep(interval)
[docs]class ClusterTask(Task): """Abstract base class for executable cluster tasks""" def __init__(self, *args, **kwargs): """Instantiate a new :obj:`~pyjob.task.ClusterTask`""" super(ClusterTask, self).__init__(*args, **kwargs) self.dependency = kwargs.get('dependency', []) self.max_array_size = kwargs.get('max_array_size') or config.get('max_array_size') or len(self.script) self.priority = kwargs.get('priority', None) self.queue = kwargs.get('queue') or config.get('queue') self.environment = kwargs.get('environment') or config.get('environment') or 'mpi' self.runtime = kwargs.get('runtime') or config.get('runtime') self.shell = kwargs.get('shell') or config.get('shell') self.name = kwargs.get('name') or config.get('name') or 'pyjob' self.extra = kwargs.get('extra', []) self.cleanup = kwargs.get('cleanup') or config.get('cleanup') or False self.runscript = None self._check_requirements() @abc.abstractmethod def _create_runscript(self): """Utility method to create a :obj:`~pyjob.task.ClusterTask` runscript""" pass @staticmethod def _ensure_exec_available(exe): """Ensure that the specified executable is available in the system Parameters ---------- exe : str The executable to test Raises ------ :exc:`~pyjob.exception.PyJobError` The executable cannot be found """ try: cexec([exe]) except PyJobExecutableNotFoundError: raise PyJobError('Cannot find executable {}. Please ensure environment is set up correctly.'.format(exe)) def _check_requirements(self): """Abstract method to check if the user input meets the requirements for the task execution""" pass
[docs] def close(self): """Close this :obj:`~pyjob.sge.ClusterTask` after completion""" self.wait() if self.cleanup and self.runscript is not None: self.runscript.cleanup()
[docs] def get_array_bash_extension(self, jobsf, offset): """Get the array job bash extension for the ``runscript`` Parameters ---------- jobsf : str The file containing all scripts on a per-line basis offset : int The offset to be applied to the ``JOB_ARRAY_INDEX`` Returns ------- list A list of lines to be written to the ``runscript`` Raises ------ :exc:`ValueError` Invalid offset :exc:`ValueError` Valid job file required """ if jobsf is None or not os.path.isfile(jobsf): raise ValueError('Valid job file required') if offset < 0: raise ValueError('Invalid offset') if offset > 0: script_def = 'script=$(awk "NR==$(({} + {}))" {})'.format(self.__class__.JOB_ARRAY_INDEX, offset, jobsf) else: script_def = 'script=$(awk "NR=={}" {})'.format(self.__class__.JOB_ARRAY_INDEX, jobsf) return [script_def, 'log=$(echo $script | sed "s/\\.${script##*.}/\\.log/")', '$script > $log 2>&1']