Source code for pyjob.sge

import logging
import re
import uuid
from enum import Enum

from pyjob.cexec import cexec
from pyjob.exception import PyJobError, PyJobExecutableNotFoundError
from pyjob.script import Script
from pyjob.task import ClusterTask

logger = logging.getLogger(__name__)

RE_LINE_SPLIT = re.compile(r":\s+")
RE_PID_MATCH = re.compile(r"Your job.*has been submitted")


[docs]class SGEConfigParameter(Enum): ENVIRONMENT = 1 QUEUE = 2
[docs]class SunGridEngineTask(ClusterTask): """SunGridEngine executable :obj:`~pyjob.task.Task`""" JOB_ARRAY_INDEX = "$SGE_TASK_ID" SCRIPT_DIRECTIVE = "#$" _sge_avail_configs_by_env = {} @property def info(self): """:obj:`~pyjob.sge.SunGridEngineTask` information""" if self.pid is None: return {} try: stdout = cexec(["qstat", "-j", str(self.pid)], permit_nonzero=True) except PyJobExecutableNotFoundError: return {} data = {} for line in stdout.splitlines(): line = line.strip() if "jobs do not exist" in line: return data if not line or "=" * 30 in line: continue else: kv = RE_LINE_SPLIT.split(line, 1) if len(kv) == 2: data[kv[0]] = kv[1] return data
[docs] @classmethod def get_sge_avail_configs(cls, param): """Get the set of available configurations for a given SGE parameter Parameters ---------- param : :obj:~SGEConfigParameter The parameter to be tested Returns ------- set A set with the available configurations for the parameter of interest Raises ------ :exc:`ValueError` Parameter is not found in :obj:~SGEConfigParameter """ if param in cls._sge_avail_configs_by_env: return cls._sge_avail_configs_by_env[param] if SGEConfigParameter(param) == SGEConfigParameter.ENVIRONMENT: cmd = ["qconf", "-spl"] elif SGEConfigParameter(param) == SGEConfigParameter.QUEUE: cmd = ["qconf", "-sql"] else: raise ValueError("Requested SGE parameter is not supported!") stdout = cexec(cmd, permit_nonzero=True) config = [] for line in stdout.splitlines(): line = line.split() if len(line) > 1: break else: config.append(line[0].encode("utf-8")) cls._sge_avail_configs_by_env[param] = set(config) return cls._sge_avail_configs_by_env[param]
def _check_requirements(self): """Check if the requirements for task execution are met""" self._ensure_exec_available("qstat") sge_config_by_env = self.get_sge_avail_configs(SGEConfigParameter.ENVIRONMENT) if self.environment and self.environment not in sge_config_by_env: raise PyJobError( f"Requested environment {self.environment} cannot be found. " f"List of available environments: {sge_config_by_env}" ) sge_config_by_queue = self.get_sge_avail_configs(SGEConfigParameter.QUEUE) if self.queue and self.queue not in sge_config_by_queue: raise PyJobError( f"Requested queue {self.environment} cannot be found. " f"List of available queues: {sge_config_by_queue}" )
[docs] def kill(self): """Immediately terminate the :obj:`~pyjob.sge.SunGridEngineTask`""" if self.pid is None: return cexec(["qdel", str(self.pid)]) logger.debug("Terminated task: %d", self.pid)
def _run(self): """Method to initialise :obj:`~pyjob.sge.SunGridEngineTask` execution""" self.runscript = self._create_runscript() self.runscript.write() stdout = cexec(["qsub", self.runscript.path], cwd=self.directory) for line in stdout.split("\n"): line = line.strip() if re.match(RE_PID_MATCH, line): if len(self.script) > 1: self.pid = int(line.split()[2].split(".")[0]) else: self.pid = int(line.split()[2]) logger.debug( "%s [%d] submission script is %s", self.__class__.__qualname__, self.pid, self.runscript.path, ) def _create_runscript(self): """Utility method to create runscript""" runscript = Script( directory=self.directory, prefix="sge_", suffix=".script", stem=str(uuid.uuid1().int), ) runscript.append(self.__class__.SCRIPT_DIRECTIVE + " -V") runscript.append(self.__class__.SCRIPT_DIRECTIVE + " -w e") runscript.append(self.__class__.SCRIPT_DIRECTIVE + " -j yes") runscript.append(self.__class__.SCRIPT_DIRECTIVE + f" -N {self.name}") if self.dependency: cmd = f'-hold_jid {",".join(map(str, self.dependency))}' runscript.append(self.__class__.SCRIPT_DIRECTIVE + " " + cmd) if self.priority: cmd = f"-p {self.priority}" runscript.append(self.__class__.SCRIPT_DIRECTIVE + " " + cmd) if self.queue: cmd = f"-q {self.queue}" runscript.append(self.__class__.SCRIPT_DIRECTIVE + " " + cmd) if self.runtime: cmd = f"-l h_rt={self.get_time(self.runtime)}" runscript.append(self.__class__.SCRIPT_DIRECTIVE + " " + cmd) if self.shell: cmd = f"-S {self.shell}" runscript.append(self.__class__.SCRIPT_DIRECTIVE + " " + cmd) if self.nprocesses and self.environment: cmd = f"-pe {self.environment} {self.nprocesses}" runscript.append(self.__class__.SCRIPT_DIRECTIVE + " " + cmd) if self.directory: cmd = f"-wd {self.directory}" runscript.append(self.__class__.SCRIPT_DIRECTIVE + " " + cmd) if self.extra: cmd = " ".join(map(str, self.extra)) runscript.append(self.__class__.SCRIPT_DIRECTIVE + " " + cmd) if len(self.script) > 1: logf = runscript.path.replace(".script", ".log") jobsf = runscript.path.replace(".script", ".jobs") with open(jobsf, "w") as f_out: f_out.write("\n".join(self.script)) cmd = f"-t 1-{len(self.script)} -tc {self.max_array_size}" runscript.append(self.__class__.SCRIPT_DIRECTIVE + " " + cmd) runscript.append(self.__class__.SCRIPT_DIRECTIVE + f" -o {logf}") runscript.extend(self.get_array_bash_extension(jobsf, 0)) else: runscript.append(self.__class__.SCRIPT_DIRECTIVE + f" -o {self.log[0]}") runscript.append(self.script[0]) return runscript