Source code for pyjob.local
import logging
import multiprocessing
import os
import sys
import time
import uuid
from pyjob.cexec import cexec
from pyjob.task import Task
CPU_COUNT = multiprocessing.cpu_count()
logger = logging.getLogger(__name__)
[docs]class LocalTask(Task):
"""Locally executable :obj:`~pyjob.task.Task`"""
def __init__(self, *args, **kwargs):
"""Instantiate a new :obj:`~pyjob.local.LocalTask`"""
super().__init__(*args, **kwargs)
self.queue = multiprocessing.Queue()
self.kill_switch = multiprocessing.Event()
self.processes = []
self.chdir = kwargs.get("chdir", False)
self.permit_nonzero = kwargs.get("permit_nonzero", False)
self._killed = False
@property
def nprocesses(self):
"""Getter for the number of concurrent :obj:`~pyjob.local.LocalProcess`"""
return self._nprocesses
@nprocesses.setter
def nprocesses(self, nprocesses):
"""Setter for the number of concurrent :obj:`~pyjob.local.LocalProcess`"""
if nprocesses > CPU_COUNT:
logger.warning("More processes requested than available CPUs")
self._nprocesses = nprocesses
@property
def info(self):
""":obj:`~pyjob.local.LocalTask` information"""
if any(proc.is_alive() for proc in self.processes):
return {"job_number": self.pid, "status": "Running"}
return {}
[docs] def close(self):
"""Close this :obj:`~pyjob.local.LocalTask` after completion"""
if self._killed:
return
for proc in self.processes:
proc.join()
self.kill()
[docs] def kill(self):
"""Immediately terminate the :obj:`~pyjob.local.LocalTask`"""
if self._killed:
return
if not self.kill_switch.is_set():
self.kill_switch.set()
# This is a requirement to avoid access to memory-inaccessible processes
# The queue gets flushed by triggering the kill_switch
for proc in self.processes:
proc.join()
for proc in self.processes:
proc.terminate()
logger.debug("Terminated task: %d", self.pid)
self._killed = True
def _run(self):
"""Method to initialise :obj:`~pyjob.local.LocalTask` execution"""
if self._killed:
return
for _ in range(self.nprocesses):
proc = LocalProcess(
self.queue,
self.kill_switch,
directory=self.directory,
chdir=self.chdir,
permit_nonzero=self.permit_nonzero,
)
proc.start()
self.processes.append(proc)
for script in self.script:
self.queue.put(script)
for _ in self.processes:
self.queue.put(None)
self.queue.close()
self.pid = uuid.uuid1().int
time.sleep(0.1)
[docs]class LocalProcess(multiprocessing.Process):
"""Extension to :obj:`multiprocessing.Process` for :obj:`~pyjob.local.LocalTask`"""
def __init__(
self, queue, kill_switch, directory=None, permit_nonzero=False, chdir=False
):
"""Instantiate a :obj:`~pyjob.local.LocalProcess`
Parameters
----------
queue : :obj:`~multiprocessing.Queue`
An instance of a :obj:`~multiprocessing.Queue`
kill_switch : obj
An instance of a :obj:`~multiprocessing.Event`
directory : str, optional
The directory to execute the jobs in
permit_nonzero : bool, optional
Allow non-zero return codes
Warning
-------
This object should only be instantiated by :obj:`~pyjob.local.LocalTask`!
"""
super(LocalProcess, self).__init__()
self.queue = queue
self.kill_switch = kill_switch
self.directory = directory
self.permit_nonzero = permit_nonzero
self.chdir = chdir
[docs] def run(self):
"""Method representing the :obj:`~pyjob.local.LocalProcess` activity"""
for job in iter(self.queue.get, None):
if self.kill_switch.is_set():
continue
if self.chdir:
directory = os.path.dirname(job)
else:
directory = self.directory
log = os.path.splitext(job)[0] + ".log"
with open(log, "w") as f:
cexec(
[job], cwd=directory, stdout=f, permit_nonzero=self.permit_nonzero
)