Source code for pyjob.lsf
import logging
import time
import uuid
from pyjob.cexec import cexec
from pyjob.exception import PyJobError, PyJobExecutableNotFoundError
from pyjob.script import Script
from pyjob.task import ClusterTask
logger = logging.getLogger(__name__)
[docs]class LoadSharingFacilityTask(ClusterTask):
"""LoadSharingFacility (LSF) executable :obj:`~pyjob.task.Task`"""
JOB_ARRAY_INDEX = "$LSB_JOBINDEX"
SCRIPT_DIRECTIVE = "#BSUB"
@property
def info(self):
""":obj:`~pyjob.lsf.LoadSharingFacilityTask` information"""
if self.pid is None:
return {}
try:
stdout = cexec(["bjobs", "-l", str(self.pid)], permit_nonzero=True)
except PyJobExecutableNotFoundError:
return {}
if "Done successfully" in stdout:
return {}
else:
return {"job_number": self.pid, "status": "Running"}
def _check_requirements(self):
"""Check if the requirements for task execution are met"""
self._ensure_exec_available("bjobs")
[docs] def kill(self):
"""Immediately terminate the :obj:`~pyjob.lsf.LoadSharingFacilityTask`
Raises
------
:exc:`RuntimeError`
Cannot delete :obj:`~pyjob.lsf.LoadSharingFacilityTask`
"""
if self.pid is None:
return
stdout = cexec(["bkill", str(self.pid)], permit_nonzero=True)
if "is in progress" in stdout:
stdout = cexec(["bkill", "-b", str(self.pid)], permit_nonzero=True)
time.sleep(10)
if any(
text in stdout
for text in [
"has already finished",
"is being terminated",
"is in progress",
]
):
logger.debug("Terminated task: %d", self.pid)
else:
raise RuntimeError("Cannot delete task!")
def _run(self):
"""Method to initialise :obj:`~pyjob.lsf.LoadSharingFacilityTask` execution"""
self.runscript = self._create_runscript()
self.runscript.write()
stdout = cexec(["bsub"], stdin=str(self.runscript), cwd=self.directory)
self.pid = int(stdout.split()[1][1:-1])
logger.debug(
"%s [%d] submission script is %s",
self.__class__.__qualname__,
self.pid,
self.runscript.path,
)
def _create_runscript(self):
"""Utility method to create runscript"""
runscript = Script(
directory=self.directory,
prefix="lsf_",
suffix=".script",
stem=str(uuid.uuid1().int),
)
if self.dependency:
cmd = "-w {}".format(" && ".join([f"deps({d})" for d in self.dependency]))
runscript.append(self.__class__.SCRIPT_DIRECTIVE + " " + cmd)
if self.directory:
cmd = f"-cwd {self.directory}"
runscript.append(self.__class__.SCRIPT_DIRECTIVE + " " + cmd)
if self.priority:
cmd = f"-sp {self.priority}"
runscript.append(self.__class__.SCRIPT_DIRECTIVE + " " + cmd)
if self.queue:
cmd = f"-q {self.queue}"
runscript.append(self.__class__.SCRIPT_DIRECTIVE + " " + cmd)
if self.runtime:
cmd = f"-W {self.runtime}"
runscript.append(self.__class__.SCRIPT_DIRECTIVE + " " + cmd)
if self.shell:
cmd = f"-L {self.shell}"
runscript.append(self.__class__.SCRIPT_DIRECTIVE + " " + cmd)
if self.nprocesses:
cmd = f'-R "span[ptile={self.nprocesses}]"'
runscript.append(self.__class__.SCRIPT_DIRECTIVE + " " + cmd)
if self.extra:
cmd = " ".join(map(str, self.extra))
runscript.append(self.__class__.SCRIPT_DIRECTIVE + " " + cmd)
if len(self.script) > 1:
logf = runscript.path.replace(".script", ".log")
jobsf = runscript.path.replace(".script", ".jobs")
with open(jobsf, "w") as f_out:
f_out.write("\n".join(self.script))
cmd = f"-J {self.name}[1-{len(self.script)}]%{self.max_array_size}"
runscript.append(self.__class__.SCRIPT_DIRECTIVE + " " + cmd)
runscript.append(self.__class__.SCRIPT_DIRECTIVE + f" -o {logf}")
runscript.extend(self.get_array_bash_extension(jobsf, 1))
else:
runscript.append(self.__class__.SCRIPT_DIRECTIVE + f" -J {self.name}")
runscript.append(self.__class__.SCRIPT_DIRECTIVE + f" -o {self.log[0]}")
runscript.append(self.script[0])
return runscript