Source code for pyjob.task

import abc
import logging
import os
import time

from pyjob import cexec, config
from pyjob.exception import (
    PyJobError,
    PyJobExecutableNotFoundError,
    PyJobTaskLockedError,
)
from pyjob.script import ScriptCollector

logger = logging.getLogger(__name__)


[docs]class Task(abc.ABC): """Abstract base class for executable tasks""" def __init__(self, script, *args, **kwargs): """Instantiate a new :obj:`~pyjob.task.Task` Parameters ---------- script : :obj:`~pyjob.script.ScriptCollector`, :obj:`~pyjob.script.Script`, str, list, tuple A :obj:`str`, :obj:`list` or :obj:`tuple` of one or more script paths """ self.pid = None self.locked = False if isinstance(script, ScriptCollector): self.script_collector = script else: self.script_collector = ScriptCollector(script) self.directory = os.path.abspath( kwargs.get("directory") or config.get("directory") or "." ) self.nprocesses = kwargs.get("processes") or config.get("processes") or 1 def __del__(self): """Exit function at instance deletion""" if not self.locked: self.lock() self.close() def __enter__(self): """Contextmanager entry function Note ---- For further details see `PEP 343 <https://www.python.org/dev/peps/pep-0343/>`_. """ return self def __exit__(self, *exc): """Contextmanager exit function Note ---- For further details see `PEP 343 <https://www.python.org/dev/peps/pep-0343/>`_. """ if not self.locked: self.lock() self.close() def __repr__(self): """Representation of the :obj:`~pyjob.task.Task`""" return f"{self.__class__.__qualname__}(pid={self.pid})" # ------------------ Abstract methods and properties ------------------ @property @abc.abstractmethod def info(self): # pragma: no cover """Abstract property to provide info about the :obj:`~pyjob.task.Task`"""
[docs] @abc.abstractmethod def close(self): # pragma: no cover """Abstract method to end :obj:`~pyjob.task.Task`"""
[docs] @abc.abstractmethod def kill(self): # pragma: no cover """Abstract method to forcefully terminate :obj:`~pyjob.task.Task`"""
@abc.abstractmethod def _run(self): # pragma: no cover """Abstract property to start execution of the :obj:`~pyjob.task.Task`""" # ------------------ Other task-specific general methods ------------------ @property def completed(self): """Boolean to indicate :obj:`~pyjob.task.Task` completion""" return self.locked and not bool(self.info) @property def log(self): """The log file path""" return [script.log for script in self.script_collector] @property def script(self): """The script file path""" return [script.path for script in self.script_collector]
[docs] @staticmethod def get_time(minutes): """Return runtime string with format hh:mm:ss to be used in :obj:`~pyjob.task.Task` Parameters ---------- minutes : int Integer with the number of minutes to allocate to runtime Raises ------ :exc:`~pyjob.exception.PyJobError` Argument is not a positive integer """ if isinstance(minutes, int) and minutes > 0: h, m = divmod(minutes, 60) return f"{h:02d}:{m:02d}:00" else: raise PyJobError("Task runtime has to be a positive integer!")
[docs] def add_script(self, script): """Add further scripts to this :obj:`~pyjob.task.Task` Parameters ---------- script : :obj:`~pyjob.script.Script`, str, list, tuple Something representing one or more scripts """ if self.locked: raise PyJobTaskLockedError("This task is locked!") self.script_collector.add(script)
[docs] def lock(self): """Lock this :obj:`~pyjob.task.Task`""" self.locked = True logger.debug("Locked %s [%d]", self.__class__.__qualname__, self.pid)
[docs] def run(self): """Start the execution of this :obj:`~pyjob.task.Task` Raises ------ :exc:`~pyjob.exception.PyJobError` One or more executable scripts required prior to execution :exc:`~pyjob.exception.PyJobTaskLockedError` Locked task, cannot restart or rerun """ if self.locked: raise PyJobTaskLockedError("This task is locked!") if len(self.script_collector) < 1: raise PyJobError( "One or more executable scripts required prior to execution" ) self.script_collector.dump() self._run() logger.debug( "Started execution of %s [%d]", self.__class__.__qualname__, self.pid ) self.lock()
[docs] def wait(self, interval=30, monitor_f=None, success_f=None): """Method to wait for the completion of the current :obj:`~pyjob.task.Task` Parameters ---------- interval : int, optional The interval to wait between checking (in seconds) monitor_f : callable, optional A :obj:`callable` that is regularly invoked success_f : callable, optional A :obj:`callable` to check for early termination of :obj:`~pyjob.task.Task` Note ---- The `success_f` argument needs to accept a log file as input and return a :obj:`bool`. """ def is_successful_run(log): return os.path.isfile(log) and success_f(log) def is_callable_fn(fn): return bool(fn and callable(fn)) check_success = is_callable_fn(success_f) callback = monitor_f if is_callable_fn(monitor_f) else lambda: None if check_success: msg = "Checking for %s %d success with function %s" logger.debug(msg, self.__class__.__qualname__, self.pid, success_f.__name__) while not self.completed: if check_success: for log in self.log: if is_successful_run(log): logger.debug( "%s %d succeeded, run log: %s", self.__class__.__qualname__, self.pid, log, ) self.kill() callback() time.sleep(interval)
[docs]class ClusterTask(Task): """Abstract base class for executable cluster tasks""" def __init__(self, *args, **kwargs): """Instantiate a new :obj:`~pyjob.task.ClusterTask`""" super(ClusterTask, self).__init__(*args, **kwargs) self.dependency = kwargs.get("dependency", []) self.max_array_size = ( kwargs.get("max_array_size") or config.get("max_array_size") or len(self.script) ) self.priority = kwargs.get("priority", None) self.queue = kwargs.get("queue") or config.get("queue") self.environment = ( kwargs.get("environment") or config.get("environment") or "mpi" ) self.runtime = kwargs.get("runtime") or config.get("runtime") self.shell = kwargs.get("shell") or config.get("shell") self.name = kwargs.get("name") or config.get("name") or "pyjob" self.extra = kwargs.get("extra", []) self.cleanup = kwargs.get("cleanup") or config.get("cleanup") or False self.runscript = None self._check_requirements() @abc.abstractmethod def _create_runscript(self): """Utility method to create a :obj:`~pyjob.task.ClusterTask` runscript""" @staticmethod def _ensure_exec_available(exe): """Ensure that the specified executable is available in the system Parameters ---------- exe : str The executable to test Raises ------ :exc:`~pyjob.exception.PyJobError` The executable cannot be found """ try: cexec([exe]) except PyJobExecutableNotFoundError: raise PyJobError( f"Cannot find executable {exe}. Please ensure environment is set up correctly." ) def _check_requirements(self): """Abstract method to check if the user input meets the requirements for the task execution"""
[docs] def close(self): """Close this :obj:`~pyjob.sge.ClusterTask` after completion""" self.wait() if self.cleanup and self.runscript is not None: self.runscript.cleanup()
[docs] def get_array_bash_extension(self, jobsf, offset): """Get the array job bash extension for the ``runscript`` Parameters ---------- jobsf : str The file containing all scripts on a per-line basis offset : int The offset to be applied to the ``JOB_ARRAY_INDEX`` Returns ------- list A list of lines to be written to the ``runscript`` Raises ------ :exc:`ValueError` Invalid offset :exc:`ValueError` Valid job file required """ if jobsf is None or not os.path.isfile(jobsf): raise ValueError("Valid job file required") if offset < 0: raise ValueError("Invalid offset") job_array_index = self.__class__.JOB_ARRAY_INDEX if offset > 0: script_def = ( f'script=$(awk "NR==$(({job_array_index} + {offset}))" {jobsf})' ) else: script_def = f'script=$(awk "NR=={job_array_index}" {jobsf})' return [ script_def, 'log=$(echo $script | sed "s/\\.${script##*.}/\\.log/")', "$script > $log 2>&1", ]