# MIT License
#
# Copyright (c) 2017-18 Felix Simkovic
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
__author__ = 'Felix Simkovic'
__version__ = '1.0'
import logging
import multiprocessing
import os
import sys
import time
import uuid
from pyjob.cexec import cexec
from pyjob.task import Task
CPU_COUNT = multiprocessing.cpu_count()
logger = logging.getLogger(__name__)
[docs]class LocalTask(Task):
"""Locally executable :obj:`~pyjob.task.Task`"""
def __init__(self, *args, **kwargs):
"""Instantiate a new :obj:`~pyjob.local.LocalTask`"""
super(LocalTask, self).__init__(*args, **kwargs)
self.queue = multiprocessing.Queue()
self.kill_switch = multiprocessing.Event()
self.processes = []
self.chdir = kwargs.get('chdir', False)
self.permit_nonzero = kwargs.get('permit_nonzero', False)
self._killed = False
@property
def nprocesses(self):
"""Getter for the number of concurrent :obj:`~pyjob.local.LocalProcess`"""
return self._nprocesses
@nprocesses.setter
def nprocesses(self, nprocesses):
"""Setter for the number of concurrent :obj:`~pyjob.local.LocalProcess`"""
if nprocesses > CPU_COUNT:
logger.warning('More processes requested than available CPUs')
self._nprocesses = nprocesses
@property
def info(self):
""":obj:`~pyjob.local.LocalTask` information"""
if any(proc.is_alive() for proc in self.processes):
return {'job_number': self.pid, 'status': 'Running'}
return {}
[docs] def close(self):
"""Close this :obj:`~pyjob.local.LocalTask` after completion"""
if self._killed:
return
for proc in self.processes:
proc.join()
self.kill()
[docs] def kill(self):
"""Immediately terminate the :obj:`~pyjob.local.LocalTask`"""
if self._killed:
return
if not self.kill_switch.is_set():
self.kill_switch.set()
# This is a requirement to avoid access to memory-inaccessible processes
# The queue gets flushed by triggering the kill_switch
for proc in self.processes:
proc.join()
for proc in self.processes:
proc.terminate()
logger.debug("Terminated task: %d", self.pid)
self._killed = True
def _run(self):
"""Method to initialise :obj:`~pyjob.local.LocalTask` execution"""
if self._killed:
return
for _ in range(self.nprocesses):
proc = LocalProcess(
self.queue,
self.kill_switch,
directory=self.directory,
chdir=self.chdir,
permit_nonzero=self.permit_nonzero,
)
proc.start()
self.processes.append(proc)
for script in self.script:
self.queue.put(script)
for _ in self.processes:
self.queue.put(None)
self.queue.close()
self.pid = uuid.uuid1().int
time.sleep(0.1)
[docs]class LocalProcess(multiprocessing.Process):
"""Extension to :obj:`multiprocessing.Process` for :obj:`~pyjob.local.LocalTask`"""
def __init__(self, queue, kill_switch, directory=None, permit_nonzero=False, chdir=False):
"""Instantiate a :obj:`~pyjob.local.LocalProcess`
Parameters
----------
queue : :obj:`~multiprocessing.Queue`
An instance of a :obj:`~multiprocessing.Queue`
kill_switch : obj
An instance of a :obj:`~multiprocessing.Event`
directory : str, optional
The directory to execute the jobs in
permit_nonzero : bool, optional
Allow non-zero return codes
Warning
-------
This object should only be instantiated by :obj:`~pyjob.local.LocalTask`!
"""
super(LocalProcess, self).__init__()
self.queue = queue
self.kill_switch = kill_switch
self.directory = directory
self.permit_nonzero = permit_nonzero
self.chdir = chdir
[docs] def run(self):
"""Method representing the :obj:`~pyjob.local.LocalProcess` activity"""
for job in iter(self.queue.get, None):
if self.kill_switch.is_set():
continue
if self.chdir:
directory = os.path.dirname(job)
else:
directory = self.directory
log = os.path.splitext(job)[0] + '.log'
with open(log, 'w') as f:
cexec([job], cwd=directory, stdout=f, permit_nonzero=self.permit_nonzero)