import abc
import logging
import os
import time
from pyjob import cexec, config
from pyjob.exception import (
PyJobError,
PyJobExecutableNotFoundError,
PyJobTaskLockedError,
)
from pyjob.script import ScriptCollector
logger = logging.getLogger(__name__)
[docs]class Task(abc.ABC):
"""Abstract base class for executable tasks"""
def __init__(self, script, *args, **kwargs):
"""Instantiate a new :obj:`~pyjob.task.Task`
Parameters
----------
script : :obj:`~pyjob.script.ScriptCollector`, :obj:`~pyjob.script.Script`, str, list, tuple
A :obj:`str`, :obj:`list` or :obj:`tuple` of one or more script paths
"""
self.pid = None
self.locked = False
if isinstance(script, ScriptCollector):
self.script_collector = script
else:
self.script_collector = ScriptCollector(script)
self.directory = os.path.abspath(
kwargs.get("directory") or config.get("directory") or "."
)
self.nprocesses = kwargs.get("processes") or config.get("processes") or 1
def __del__(self):
"""Exit function at instance deletion"""
if not self.locked:
self.lock()
self.close()
def __enter__(self):
"""Contextmanager entry function
Note
----
For further details see `PEP 343 <https://www.python.org/dev/peps/pep-0343/>`_.
"""
return self
def __exit__(self, *exc):
"""Contextmanager exit function
Note
----
For further details see `PEP 343 <https://www.python.org/dev/peps/pep-0343/>`_.
"""
if not self.locked:
self.lock()
self.close()
def __repr__(self):
"""Representation of the :obj:`~pyjob.task.Task`"""
return f"{self.__class__.__qualname__}(pid={self.pid})"
# ------------------ Abstract methods and properties ------------------
@property
@abc.abstractmethod
def info(self): # pragma: no cover
"""Abstract property to provide info about the :obj:`~pyjob.task.Task`"""
[docs] @abc.abstractmethod
def close(self): # pragma: no cover
"""Abstract method to end :obj:`~pyjob.task.Task`"""
[docs] @abc.abstractmethod
def kill(self): # pragma: no cover
"""Abstract method to forcefully terminate :obj:`~pyjob.task.Task`"""
@abc.abstractmethod
def _run(self): # pragma: no cover
"""Abstract property to start execution of the :obj:`~pyjob.task.Task`"""
# ------------------ Other task-specific general methods ------------------
@property
def completed(self):
"""Boolean to indicate :obj:`~pyjob.task.Task` completion"""
return self.locked and not bool(self.info)
@property
def log(self):
"""The log file path"""
return [script.log for script in self.script_collector]
@property
def script(self):
"""The script file path"""
return [script.path for script in self.script_collector]
[docs] @staticmethod
def get_time(minutes):
"""Return runtime string with format hh:mm:ss to be used in :obj:`~pyjob.task.Task`
Parameters
----------
minutes : int
Integer with the number of minutes to allocate to runtime
Raises
------
:exc:`~pyjob.exception.PyJobError`
Argument is not a positive integer
"""
if isinstance(minutes, int) and minutes > 0:
h, m = divmod(minutes, 60)
return f"{h:02d}:{m:02d}:00"
else:
raise PyJobError("Task runtime has to be a positive integer!")
[docs] def add_script(self, script):
"""Add further scripts to this :obj:`~pyjob.task.Task`
Parameters
----------
script : :obj:`~pyjob.script.Script`, str, list, tuple
Something representing one or more scripts
"""
if self.locked:
raise PyJobTaskLockedError("This task is locked!")
self.script_collector.add(script)
[docs] def lock(self):
"""Lock this :obj:`~pyjob.task.Task`"""
self.locked = True
logger.debug("Locked %s [%d]", self.__class__.__qualname__, self.pid)
[docs] def run(self):
"""Start the execution of this :obj:`~pyjob.task.Task`
Raises
------
:exc:`~pyjob.exception.PyJobError`
One or more executable scripts required prior to execution
:exc:`~pyjob.exception.PyJobTaskLockedError`
Locked task, cannot restart or rerun
"""
if self.locked:
raise PyJobTaskLockedError("This task is locked!")
if len(self.script_collector) < 1:
raise PyJobError(
"One or more executable scripts required prior to execution"
)
self.script_collector.dump()
self._run()
logger.debug(
"Started execution of %s [%d]", self.__class__.__qualname__, self.pid
)
self.lock()
[docs] def wait(self, interval=30, monitor_f=None, success_f=None):
"""Method to wait for the completion of the current :obj:`~pyjob.task.Task`
Parameters
----------
interval : int, optional
The interval to wait between checking (in seconds)
monitor_f : callable, optional
A :obj:`callable` that is regularly invoked
success_f : callable, optional
A :obj:`callable` to check for early termination of :obj:`~pyjob.task.Task`
Note
----
The `success_f` argument needs to accept a log file as input and return
a :obj:`bool`.
"""
def is_successful_run(log):
return os.path.isfile(log) and success_f(log)
def is_callable_fn(fn):
return bool(fn and callable(fn))
check_success = is_callable_fn(success_f)
callback = monitor_f if is_callable_fn(monitor_f) else lambda: None
if check_success:
msg = "Checking for %s %d success with function %s"
logger.debug(msg, self.__class__.__qualname__, self.pid, success_f.__name__)
while not self.completed:
if check_success:
for log in self.log:
if is_successful_run(log):
logger.debug(
"%s %d succeeded, run log: %s",
self.__class__.__qualname__,
self.pid,
log,
)
self.kill()
callback()
time.sleep(interval)
[docs]class ClusterTask(Task):
"""Abstract base class for executable cluster tasks"""
def __init__(self, *args, **kwargs):
"""Instantiate a new :obj:`~pyjob.task.ClusterTask`"""
super(ClusterTask, self).__init__(*args, **kwargs)
self.dependency = kwargs.get("dependency", [])
self.max_array_size = (
kwargs.get("max_array_size")
or config.get("max_array_size")
or len(self.script)
)
self.priority = kwargs.get("priority", None)
self.queue = kwargs.get("queue") or config.get("queue")
self.environment = (
kwargs.get("environment") or config.get("environment") or "mpi"
)
self.runtime = kwargs.get("runtime") or config.get("runtime")
self.shell = kwargs.get("shell") or config.get("shell")
self.name = kwargs.get("name") or config.get("name") or "pyjob"
self.extra = kwargs.get("extra", [])
self.cleanup = kwargs.get("cleanup") or config.get("cleanup") or False
self.runscript = None
self._check_requirements()
@abc.abstractmethod
def _create_runscript(self):
"""Utility method to create a :obj:`~pyjob.task.ClusterTask` runscript"""
@staticmethod
def _ensure_exec_available(exe):
"""Ensure that the specified executable is available in the system
Parameters
----------
exe : str
The executable to test
Raises
------
:exc:`~pyjob.exception.PyJobError`
The executable cannot be found
"""
try:
cexec([exe])
except PyJobExecutableNotFoundError:
raise PyJobError(
f"Cannot find executable {exe}. Please ensure environment is set up correctly."
)
def _check_requirements(self):
"""Abstract method to check if the user input meets the requirements for the task execution"""
[docs] def close(self):
"""Close this :obj:`~pyjob.sge.ClusterTask` after completion"""
self.wait()
if self.cleanup and self.runscript is not None:
self.runscript.cleanup()
[docs] def get_array_bash_extension(self, jobsf, offset):
"""Get the array job bash extension for the ``runscript``
Parameters
----------
jobsf : str
The file containing all scripts on a per-line basis
offset : int
The offset to be applied to the ``JOB_ARRAY_INDEX``
Returns
-------
list
A list of lines to be written to the ``runscript``
Raises
------
:exc:`ValueError`
Invalid offset
:exc:`ValueError`
Valid job file required
"""
if jobsf is None or not os.path.isfile(jobsf):
raise ValueError("Valid job file required")
if offset < 0:
raise ValueError("Invalid offset")
job_array_index = self.__class__.JOB_ARRAY_INDEX
if offset > 0:
script_def = (
f'script=$(awk "NR==$(({job_array_index} + {offset}))" {jobsf})'
)
else:
script_def = f'script=$(awk "NR=={job_array_index}" {jobsf})'
return [
script_def,
'log=$(echo $script | sed "s/\\.${script##*.}/\\.log/")',
"$script > $log 2>&1",
]