Source code for pyjob.cexec
import logging
import os
import signal
import subprocess
import sys
from pyjob.exception import PyJobExecutableNotFoundError, PyJobExecutionError
from pyjob.misc import decode
logger = logging.getLogger(__name__)
[docs]def is_exe(fpath):
"""Status to indicate if a file is an executable
Parameters
----------
fpath : str
The path to the file to be tested
Returns
-------
bool
"""
return os.path.isfile(fpath) and os.access(fpath, os.X_OK)
[docs]def which(executable):
"""Python-based mirror of UNIX ``which`` command
Parameters
----------
executable : str
The path or name for an executable
Returns
-------
str
The absolute path to the executable, or ``None`` if not found
Credits
-------
https://stackoverflow.com/a/377028/3046533
"""
fpath, fname = os.path.split(executable)
if fpath:
if is_exe(executable):
return executable
else:
for path in os.environ["PATH"].split(os.pathsep):
exe_file = os.path.join(path, executable)
if is_exe(exe_file):
return exe_file
[docs]def cexec(cmd, permit_nonzero=False, **kwargs):
"""Function to execute a command
Parameters
----------
cmd : list
The command to call
permit_nonzero : bool, optional
Allow non-zero return codes [default: False]
**kwargs : dict, option
Any keyword arguments accepted by :obj:`~subprocess.Popen`
Returns
-------
str
The processes' standard out
Raises
------
:exc:`PyJobExecutableNotFoundError`
Cannot find executable
:exc:`PyJobExecutionError`
Execution exited with non-zero return code
"""
executable = which(cmd[0])
if executable is None:
raise PyJobExecutableNotFoundError(f"Cannot find executable: {cmd[0]}")
cmd[0] = executable
logger.debug('Executing "%s"', " ".join(cmd))
if os.name == "nt":
kwargs.setdefault("bufsize", 0)
kwargs.setdefault("shell", "False")
kwargs.setdefault("cwd", os.getcwd())
kwargs.setdefault("stdout", subprocess.PIPE)
kwargs.setdefault("stderr", subprocess.STDOUT)
stdinstr = kwargs.get("stdin", None)
if stdinstr and isinstance(stdinstr, str):
kwargs["stdin"] = subprocess.PIPE
try:
p = subprocess.Popen(cmd, **kwargs)
if stdinstr:
stdinstr = stdinstr.encode()
stdout, stderr = p.communicate(input=stdinstr)
except (KeyboardInterrupt, SystemExit):
os.kill(p.pid, signal.SIGTERM)
sys.exit(signal.SIGTERM)
else:
if stdout:
stdout = decode(stdout).strip()
if p.returncode == 0:
return stdout
elif permit_nonzero:
logger.debug(
"Ignoring non-zero returncode %d for '%s'", p.returncode, " ".join(cmd)
)
return stdout
else:
raise PyJobExecutionError(
f"Execution of '{' '.join(cmd)}' exited with non-zero return code ({p.returncode})"
)