subprocess—Python多进程模块

subprocess—Python多进程模块

1.概述

这篇文章介绍并行运算中的subprocess模块,subprocess 模块允许我们启动一个新进程,并连接到它们的输入/输出/错误管道,从而获取返回值。
subprocess 它可以用来调用第三方工具(例如:exe、另一个python文件、命令行工具)

subprocess 模块首先推荐使用的是它的 run 方法,更高级的用法可以直接使用 Popen 接口

subprocess 模块提供了了三个 API 处理进程。Python 3.5 中添加的 run() 函数,是一个运行进程高级 API,也可以收集它的输出。call(),check_call() 以及 check_output() 是从 Python2 继承的较早的高级 API。在已存的程序中,它们仍然被广泛支持和使用。
类 Popen 是一个低级 API,用于构建其他的 API 以及用于更复杂的进程交互。Popen 构造函数接受参数设置新进程,以便父进程可以通过管道与它通信。它提供了它替代的模块和函数的所有功能,以及更多功能。
API 对于所有使用场景是一致的,许多之前必要的额外步骤(例如关闭额外的文件描述符并确保通道关闭)现在已经内置了,无需单独代码处理。

subprocess模块的目的在于替换几个旧的模块和方法,替换 os.system(),os.spawnv() , os 和 popen2 模块中 popen 的变体以及 commands() 模块等。为了更容易地将 subprocess 同其他模块比较,本节中的许多示例都重新创建了用于 os 和 popen2 中的。

2.subprocess模块

2.1.运行外部命令run

1.默认运行外部命令

调用run方法创建一个进程执行指定的命令,等待命令执行完成后返回一个包含执行结果的CompletedProcess类的实例。
例如linux的ls命令

run方法参数

  • args:表示要执行的命令。必须是一个字符串,字符串参数列表。
  • stdin、stdout 和 stderr:子进程的标准输入、输出和错误。其值可以是 subprocess.PIPE、subprocess.DEVNULL、一个已经存在的文件描述符、已经打开的文件对象或者 None。subprocess.PIPE 表示为子进程创建新的管道。subprocess.DEVNULL 表示使用 os.devnull。默认使用的是 None,表示什么都不做。另外,stderr 可以合并到 stdout 里一起输出。
  • timeout:设置命令超时时间。如果命令执行时间超时,子进程将被杀死,并弹出 TimeoutExpired 异常。
  • check:如果该参数设置为 True,并且进程退出状态码不是 0,则弹 出 CalledProcessError 异常。
  • encoding: 如果指定了该参数,则 stdin、stdout 和 stderr 可以接收字符串数据,并以该编码方式编码。否则只接收 bytes 类型的数据。
  • shell:如果该参数为 True,将通过操作系统的 shell 执行指定的命令。
import subprocess
# 第一个参数是要运行的ls命令,第二个参数是ls命令的参数
completed = subprocess.run(['ls', '-l'])
print('returncode:', completed.returncode)

命令行参数被作为一个字符串列表传入,这样能够避免转义引号以及其他会被 shell 解析的特殊字符。run() 方法返回一个 CompletedProcess 实例,包含进程退出码以及输出等信息。

total 16
-rw-r--r--  1 edy  staff  103 Feb 17 14:01 __init__.py
-rw-r--r--  1 edy  staff  207 Feb 17 15:46 my_subprocess_1.py
returncode: 0

2.新开一个shell进程运行外部命令

设置 shell 参数为 True 会导致 subprocess 创建一个新的中间 shell 进程运行命令。默认的行为是直接运行命令。

import subprocess

completed = subprocess.run('echo $HOME', shell=True)
print('returncode:', completed.returncode)

使用中间 shell 意味着在运行该命令之前处理命令字符串的变量,glob 模式以及其他特殊的 shell 功能。

/Users/edy
returncode: 0

提醒
使用 run() 而没有传递 check=True 等价于调用 call(),它仅仅返回进程的退出码。

2.2.错误处理

CompletedProcess 的 returncode 属性是程序的退出码。调用者负责解释它并检测错误。如果 run() 的 check 参数是 True,退出码将会被检查,如果有错误出现将会引发 CalledProcessError 异常。

import subprocess

try:
    subprocess.run(['false'], check=True)
except subprocess.CalledProcessError as err:
    print('ERROR:', err)

false 命令总是返回非零状态码,run() 将它解释为一个错误。

ERROR: Command '['false']' returned non-zero exit status 1

提醒
给 run() 方法传递 check=True 等价于调用 check_all()。

2.3.捕获输出

由 run() 启动的进程的标准输入输出渠道绑定在了父进程上。那就意味着调用程序不能捕获命令的输出。给 stdout 和 stderr 参数传递 PIPE 可以捕获输出用于后续处理。

import subprocess

# 执行后的结果赋值给管道
completed = subprocess.run(
    ['ls', '-1'],
    stdout=subprocess.PIPE,
)
# 通过管道输出命令执行结果
print('returncode:', completed.returncode)
print('Have {} bytes in stdout:\n{}'.format(
    len(completed.stdout),
    completed.stdout.decode('utf-8'))
)

ls -1 命令成功运行了,所以它打印到标准输出的文本被捕获并返回了

returncode: 0
Have 31 bytes in stdout:
__init__.py
my_subprocess_1.py

提醒
传入 check=True 以及设置 stdout 为 PIPE 等价于使用 check_output()。

2.4.捕获错误输出

下个例子在子 shell 中运行了一些列的命令。在命令出错退出之前消息被发送到了标准输出和错误输出。

import subprocess

try:
    completed = subprocess.run(
        'echo to stdout; echo to stderr 1>&2; exit 1',
        check=True,
        shell=True,
        stdout=subprocess.PIPE,
    )
except subprocess.CalledProcessError as err:
    print('ERROR:', err)
else:
    print('returncode:', completed.returncode)
    print('Have {} bytes in stdout: {!r}'.format(
        len(completed.stdout),
        completed.stdout.decode('utf-8'))
    )

标准错误输出被打印到了控制台,但是标准错误输出被隐藏了

to stderr
ERROR: Command 'echo to stdout; echo to stderr 1>&2; exit 1'
returned non-zero exit status 1

为了阻止 run() 运行命令产生的错误消息打印到控制台,设置 stderr 参数为常量 PIPE。

import subprocess

try:
    completed = subprocess.run(
        'echo to stdout; echo to stderr 1>&2; exit 1',
        shell=True,
        stdout=subprocess.PIPE,
        stderr=subprocess.PIPE,
    )
except subprocess.CalledProcessError as err:
    print('ERROR:', err)
else:
    print('returncode:', completed.returncode)
    print('Have {} bytes in stdout: {!r}'.format(
        len(completed.stdout),
        completed.stdout.decode('utf-8'))
    )
    print('Have {} bytes in stderr: {!r}'.format(
        len(completed.stderr),
        completed.stderr.decode('utf-8'))
    )

这个例子没有设置 check=True,所以命令的输出被捕获并且打印。

returncode: 1
Have 10 bytes in stdout: 'to stdout\n'
Have 10 bytes in stderr: 'to stderr\n'

为了捕获当使用 check_output() 产生的错误消息时,设置 stderr 为 STDOUT,并且这些消息将与该命令的其余输出合并。

import subprocess

try:
    output = subprocess.check_output(
        'echo to stdout; echo to stderr 1>&2',
        shell=True,
        stderr=subprocess.STDOUT,
    )
except subprocess.CalledProcessError as err:
    print('ERROR:', err)
else:
    print('Have {} bytes in output: {!r}'.format(
        len(output),
        output.decode('utf-8'))
    )

输出顺序可能会变化,取决于对标准输出流的缓冲方式以及打印的数据量。

Have 20 bytes in output: 'to stdout\nto stderr\n'

2.5.抑制输出

某些情况下,输出不应该被展示和捕获,使用 DEVNULL 抑制输出流。这个例子抑制了标准输出流和错误输出流。

import subprocess

try:
    completed = subprocess.run(
        'echo to stdout; echo to stderr 1>&2; exit 1',
        shell=True,
        stdout=subprocess.DEVNULL,
        stderr=subprocess.DEVNULL,
    )
except subprocess.CalledProcessError as err:
    print('ERROR:', err)
else:
    print('returncode:', completed.returncode)
    print('stdout is {!r}'.format(completed.stdout))
    print('stderr is {!r}'.format(completed.stderr))

DEVNULL 的名字来自于 Unix 特殊的设备文件,/dev/null,当读时直接响应文件结束,写时接收但忽略任何数量的输入。

returncode: 1
stdout is None
stderr is None

3.直接使用管道Popen

基础的操作使用run()函数都能完成,如果遇到复杂的操作就需要使用更高级的 Popen类提供的方法, 能够对如何运行命令以及如何处理输入输出流提供更多的控制。例如,通过对 stdin,stdout 以及 stderr 传递不同的参数,可以达到模仿 os.popen() 的效果。

Popen类构造器参数

  • args:shell命令,可以是字符串或者序列类型(如:list,元组)
  • bufsize:缓冲区大小。当创建标准流的管道对象时使用,默认-1。
    0:不使用缓冲区
    1:表示行缓冲,仅当universal_newlines=True时可用,也就是文本模式
    正数:表示缓冲区大小
    负数:表示使用系统默认的缓冲区大小。
  • stdin, stdout, stderr:分别表示程序的标准输入、输出、错误句柄
  • preexec_fn:只在 Unix 平台下有效,用于指定一个可执行对象(callable object),它将在子进程运行之前被调用
  • shell:如果该参数为 True,将通过操作系统的 shell 执行指定的命令。
  • cwd:用于设置子进程的当前目录。
  • env:用于指定子进程的环境变量。如果 env = None,子进程的环境变量将从父进程中继承。

Popen 对象方法

  • poll(): 检查进程是否终止,如果终止返回 returncode,否则返回 None。
  • wait(timeout): 等待子进程终止。
  • communicate(input,timeout): 和子进程交互,发送和读取数据。
  • send_signal(singnal): 发送信号到子进程 。
  • terminate(): 停止子进程,也就是发送SIGTERM信号到子进程。
  • kill(): 杀死子进程。发送 SIGKILL 信号到子进程。

3.1.与进程单向通信

运行一个进程以及读取所有输出,设置 stdout 的值为 PIPE 并且调用 communicate()。

import subprocess

print('read:')
proc = subprocess.Popen(
    ['echo', '"to stdout"'],
    stdout=subprocess.PIPE,
)
stdout_value = proc.communicate()[0].decode('utf-8')
print('stdout:', repr(stdout_value))

这个类似于 popen() 的工作方式,除了读取由 Popen 实例内部管理。

read:
stdout: '"to stdout"\n'

为了设置一个管道允许调用者向其写入数据,设置 stdin 为 PIPE

import subprocess

print('write:')
proc = subprocess.Popen(
    ['cat', '-'],
    stdin=subprocess.PIPE,
)
proc.communicate('stdin: to stdin\n'.encode('utf-8'))

为了发送数据到进程的标准输入,请使用 communicate(),这就有点同 w 模式的 popen 了。

write:
stdin: to stdin

3.2.与进程双向通信

为了设置 Popen 实例同时进行读写,结合之前使用过的技术。

import subprocess

print('popen2:')

proc = subprocess.Popen(
    ['cat', '-'],
    # 输入和输出设置为管道,进行通信
    stdin=subprocess.PIPE,
    stdout=subprocess.PIPE,
)
msg = 'through stdin to stdout'.encode('utf-8')
stdout_value = proc.communicate(msg)[0].decode('utf-8')
print('pass through:', repr(stdout_value))

这样设置使用就有点像 popen2() 了。

popen2:
pass through: 'through stdin to stdout'

3.3.捕获错误输出

同时查看 stdout 和 stderr 输出流也是可能的,就像 popen3()。

import subprocess

print('popen3:')
proc = subprocess.Popen(
    'cat -; echo "to stderr" 1>&2',
    shell=True,
    stdin=subprocess.PIPE,
    stdout=subprocess.PIPE,
    stderr=subprocess.PIPE,
)
msg = 'through stdin to stdout'.encode('utf-8')
stdout_value, stderr_value = proc.communicate(msg)
print('pass through:', repr(stdout_value.decode('utf-8')))
print('stderr      :', repr(stderr_value.decode('utf-8')))

从 stderr 中读取错误输出类似于 stdout 。传入 PIPE 告诉 Popen 附加到通道,并且使用 communicate() 在返回之前读取所有数据。

popen3:
pass through: 'through stdin to stdout'
stderr      : 'to stderr\n'

3.4.合并常规和错误输出

为了将进程的错误输出导向标准输出渠道,设置 stderr 为 STDOUT 而不是 PIPE。

import subprocess

print('popen4:')
proc = subprocess.Popen(
    'cat -; echo "to stderr" 1>&2',
    shell=True,
    # 输入管道
    stdin=subprocess.PIPE,
    # 输出管道
    stdout=subprocess.PIPE,
    # 错误输出
    stderr=subprocess.STDOUT,
)
msg = 'through stdin to stdout\n'.encode('utf-8')
stdout_value, stderr_value = proc.communicate(msg)
print('combined output:', repr(stdout_value.decode('utf-8')))
print('stderr value   :', repr(stderr_value))

这种合并输出的方式类似于 popen4() 的工作方式。

popen4:
combined output: 'through stdin to stdout\nto stderr\n'
stderr value   : None

3.5.连接管道的段

多个命令可以被连接到一个 管道 中,类似于 Unix shell 的工作方式,实现这种操作,可以通过创建分隔的 Popen 实例并将他们的输入输出链在一起。
一个 Popen 实例的 stdout 属性被用作下一个的 stdin 参数,而不是之前的常量 PIPE。要获取整个执行的输出,可以从最后一个 Popen 实例的 stdout 流读取。

import subprocess

cat = subprocess.Popen(
    ['cat', 'index.rst'],
    stdout=subprocess.PIPE,
)

# 把cat拼接到grep
grep = subprocess.Popen(
    ['grep', '.. literalinclude::'],
    # 输入为上个命令的输出值
    stdin=cat.stdout,
    stdout=subprocess.PIPE,
)

# grep拼接到cut
cut = subprocess.Popen(
    ['cut', '-f', '3', '-d:'],
    stdin=grep.stdout,
    stdout=subprocess.PIPE,
)

end_of_pipe = cut.stdout

print('Included files:')
for line in end_of_pipe:
    print(line.decode('utf-8').strip())

这个例子同下面的命令行操作:

cat index.rst | grep ".. literalinclude" | cut -f 3 -d:

这个部分首先管道读取 reStructuredText 源文件,然后找到所有包含其他文件的行,最后打印被包含的文件名称


Included files:
subprocess_os_system.py
subprocess_shell_variables.py
subprocess_run_check.py
subprocess_run_output.py
subprocess_run_output_error.py
subprocess_run_output_error_trap.py
subprocess_check_output_error_trap_output.py
subprocess_run_output_error_suppress.py
subprocess_popen_read.py
subprocess_popen_write.py
subprocess_popen2.py
subprocess_popen3.py
subprocess_popen4.py
subprocess_pipes.py
repeater.py
interaction.py
signal_child.py
signal_parent.py
subprocess_signal_parent_shell.py
subprocess_signal_setpgrp.py

3.6.同另一个命令交互

所有前面的例子都假定了一个有限的交互,communicate() 方法读取所有输出并等待子进程在返回之前退出。在程序运行时也可以逐步写入和读取 Popen 实例使用的单个管道句柄。从标准输入中读取并希望如标准输出的简单回声程序说明了这种技术。

脚本 repeater.py 被用作下一个例子的子进程。它从 stdin 读取并且写入到 stdout ,一次一行,直到再没有输入。当开始和停止的时候,它也往 stderr 写入了一条消息,展示子进程的声明周期。

创建repeater.py文件,复制下面的代码。

import sys

sys.stderr.write('repeater.py: starting\n')
sys.stderr.flush()

while True:
    next_line = sys.stdin.readline()
    sys.stderr.flush()
    if not next_line:
        break
    sys.stdout.write(next_line)
    sys.stdout.flush()

sys.stderr.write('repeater.py: exiting\n')
sys.stderr.flush()

下一个例子中以不同的方式使用 Popen 实例的 stdin 和 stdout 文件句柄。在第一个例子中,五个数字被依次写入到进程的 stdin,每次写入后,紧接着会读出输入并打印出来了。第二个例子中相同的五个数字被写入,但是输出通过 communicate() 依次行读取了。

import io
import subprocess

print('One line at a time:')
proc = subprocess.Popen(
    'python3 repeater.py',
    shell=True,
    stdin=subprocess.PIPE,
    stdout=subprocess.PIPE,
)
stdin = io.TextIOWrapper(
    proc.stdin,
    encoding='utf-8',
    line_buffering=True,  # send data on newline
)
stdout = io.TextIOWrapper(
    proc.stdout,
    encoding='utf-8',
)
for i in range(5):
    line = '{}\n'.format(i)
    stdin.write(line)
    output = stdout.readline()
    print(output.rstrip())
remainder = proc.communicate()[0].decode('utf-8')
print(remainder)

print()
print('All output at once:')
proc = subprocess.Popen(
    'python3 repeater.py',
    shell=True,
    stdin=subprocess.PIPE,
    stdout=subprocess.PIPE,
)
stdin = io.TextIOWrapper(
    proc.stdin,
    encoding='utf-8',
)
for i in range(5):
    line = '{}\n'.format(i)
    stdin.write(line)
stdin.flush()

output = proc.communicate()[0].decode('utf-8')
print(output)

每个循环中, “repeater.py: exiting” 行在输出的不同点出现。


One line at a time:
repeater.py: starting
0
1
2
3
4
repeater.py: exiting

All output at once:
repeater.py: starting
repeater.py: exiting
0
1
2
3
4

3.7.进程间的信号

os 模块的进程管理示例包括使了用 os.fork() 和 os.kill() 进程之间的信号演示。由于每个 Popen 实例都提供了一个 pid 属性和子进程 id,所以可以对子进程执行类似的操作。下一个例子合并了两个脚本,子进程设置了一个 USR 信号处理器。

脚本文件内容signal_child.py

import os
import signal
import time
import sys

pid = os.getpid()
received = False

def signal_usr1(signum, frame):
    "Callback invoked when a signal is received"
    global received
    received = True
    print('CHILD {:>6}: Received USR1'.format(pid))
    sys.stdout.flush()

print('CHILD {:>6}: Setting up signal handler'.format(pid))
sys.stdout.flush()
signal.signal(signal.SIGUSR1, signal_usr1)
print('CHILD {:>6}: Pausing to wait for signal'.format(pid))
sys.stdout.flush()
time.sleep(3)

if not received:
    print('CHILD {:>6}: Never received signal'.format(pid))

这个脚本被当做父进程运行,它启动了 signal_child.py,然后发送了 USR1 信号。

import os
import signal
import subprocess
import time
import sys

proc = subprocess.Popen(['python3', 'signal_child.py'])
print('PARENT      : Pausing before sending signal...')
sys.stdout.flush()
time.sleep(1)
print('PARENT      : Signaling child')
sys.stdout.flush()
os.kill(proc.pid, signal.SIGUSR1)

运行结果


PARENT      : Pausing before sending signal...
CHILD  26976: Setting up signal handler
CHILD  26976: Pausing to wait for signal
PARENT      : Signaling child
CHILD  26976: Received USR1

3.8.进程 组 / 会话

如果由 Popen 创建的进程产生子进程,那么子进程将不会收到任何发送给父进程的任何信号。这意味着当对 Popen 使用 shell 参数时,很难通过发送 SIGINT 和 SIGTERM 来使 shell 中启动的命令终止。

subprocess_signal_parent_shell.py

import os
import signal
import subprocess
import tempfile
import time
import sys

script = '''#!/bin/sh
echo "Shell script in process $$"
set -x
python3 signal_child.py
'''
script_file = tempfile.NamedTemporaryFile('wt')
script_file.write(script)
script_file.flush()

proc = subprocess.Popen(['sh', script_file.name])
print('PARENT      : Pausing before signaling {}...'.format(
    proc.pid))
sys.stdout.flush()
time.sleep(1)
print('PARENT      : Signaling child {}'.format(proc.pid))
sys.stdout.flush()
os.kill(proc.pid, signal.SIGUSR1)
time.sleep(3)

用于发送信号的 pid 与等待信号的运行 shell 脚本的子进程 id 不同,因为这个例子中有三个独立的进程在交互:

1.主程序 subprocess_signal_parent_shell.py
2.主程序创建的运行脚本的 shell 进程。
3.程序 signal_child.py

PARENT      : Pausing before signaling 26984...
Shell script in process 26984
+ python3 signal_child.py
CHILD  26985: Setting up signal handler
CHILD  26985: Pausing to wait for signal
PARENT      : Signaling child 26984
CHILD  26985: Never received signal

要在不知道进程 id 的情况下向后代进程发送信号,请使用进程组关联这些子进程,以便可以一起发送信号。进程组使用 os.setpgrp() 创建,它将进程组 id 设置为当前进程 id。所有子进程都从父进程继承他们的进程组,因为它只应在由 Popen 及其后代创建的 shell 中设置,所以不应在创建 Popen 的相同进程中调用 os.setpgrp() 。而是,应在作为 Popen 的 preexec_fn 参数设置的函数中调用,它会在新进程的 fork 之后运行,在用 exec 运行 shell 之前。为了给进程组发送信号,应该使用 os.killpg() 并使用 Popen 实例的进程 id。

subprocess_signal_setpgrp.py

import os
import signal
import subprocess
import tempfile
import time
import sys

def show_setting_prgrp():
    print('Calling os.setpgrp() from {}'.format(os.getpid()))
    os.setpgrp()
    print('Process group is now {}'.format(os.getpgrp()))
    sys.stdout.flush()

script = '''#!/bin/sh
echo "Shell script in process $$"
set -x
python3 signal_child.py
'''
script_file = tempfile.NamedTemporaryFile('wt')
script_file.write(script)
script_file.flush()

proc = subprocess.Popen(
    ['sh', script_file.name],
    preexec_fn=show_setting_prgrp,
)
print('PARENT      : Pausing before signaling {}...'.format(
    proc.pid))
sys.stdout.flush()
time.sleep(1)
print('PARENT      : Signaling process group {}'.format(
    proc.pid))
sys.stdout.flush()
os.killpg(proc.pid, signal.SIGUSR1)
time.sleep(3)

整个运行流程如下

1.父进程实例化 Popen;
2.Popen 实例 fork 新进程;
3.新进程运行 os.setpgrp();
4.新进程运行 exec() 启动 shell;
5.shell 运行脚本;
6.shell 脚本再次 fork,然后启动 Python 解释器;
7.Python 运行 signal_child.py.
8.父进程发送信号非进程组,使用 Popen 实例的进程 id;
9.shell and Python 程序收到信号;
10.shell 忽略掉了信号。
11.运行 signal_child.py 的 Python 程序 调用了信号处理器。

python3 subprocess_signal_setpgrp.py

Calling os.setpgrp() from 75636
Process group is now 75636
PARENT      : Pausing before signaling 75636...
Shell script in process 75636
+ python3 signal_child.py
CHILD  75637: Setting up signal handler
CHILD  75637: Pausing to wait for signal
PARENT      : Signaling process group 75636
CHILD  75637: Received USR1

文章出处登录后可见!

已经登录?立即刷新

共计人评分,平均

到目前为止还没有投票!成为第一位评论此文章。

(0)
xiaoxingxing的头像xiaoxingxing管理团队
上一篇 2023年9月1日
下一篇 2023年9月1日

相关推荐