0%

使用Python优雅调用其他工具或命令

Python 开发扫描器时往往会调用其他第三方工具,但是通过 os.system()或者其他命令中是会有这样或那样的坑或不足,本文对这些问题进行总结并提出解决方案,并在结尾给出了一个封装好的类,能较为完美的解决Python调用第三方命令的问题。

需求

  1. 首先,我可以调用任意命令,并且可以在命令执行时向输入通道(stdin)传更多输入,同时Python能实时获取输出通道(stdout)和错误通道(stderr)获程序输出,注意是获取输出,而不是单纯的将输出重定向到屏幕或文件中;
  2. 我可以通过返回值,或者程序输出判断第三方程序执行是否出错,如果出错抛出异常或者进行异常处理;
  3. 执行命令是最好可以设置超时时间,防止子命令假死而影响主程序;
  4. 这段调用程序应该是跨平台的,毕竟Python本身就是跨平台语言。

实现

看到第一条和第二条需求,首先想到用 subprocess.Popen() 可以实现,确实,我们可以基于它进行改造,完成我们的任务。

subprocess.communicate() 虽然可以让 Python 拿到程序返回到stdout或stderr的输出,但是它不是实时的,要想做到实时输出,首先将stdout和stderr通道重定向到subprocess.PIPE,这里方便起见,我先将stderr通道重定向到stdout,再将stdout重定向到subprocess.PIPE,注意如果是Windows平台,直接传命令是没法拿到正确的程序返回值的,因此要在后面加上& exit 命令:

1
2
3
4
5
6
if platform.architecture()[-1] == "ELF":
cmd = cmd
else:
cmd = cmd + " & exit"
process = subprocess.Popen(cmd, shell=True, bufsize=1024, stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)

下面考虑如何从process中拿数据的问题,这里一定要新开一个线程取数据,否则可能会出现子命令执行产生大量输出填满了subprocess的缓冲区,导致整个程序阻塞的问题:

1
2
log_thread = threading.Thread(target=print_log, args=(process.stdout,))
log_thread.start()

以上的 print_log(stdout)函数用来不断向输出缓冲区取数据,并进行其他处理,这里的stdout相当于一个文件句柄,但是实践表明不能用 readline() 和readlines() 读数据,因为第三方程序输出有可能会用\r在同一行上多次打印输出(如在命令行显示进度条的及情况),用readlines()更会造成程序运行效率慢的问题(谁也不想一个日志处理环节把一个核心的CPU打满吧?),因此读取日志的函数应该这样写:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
LASTLINE=""
def print_log( stdout):
for log_line in _log_line_iter(stdout):
process_line(log_line)

def _log_line_iter(reader):
global LASTLINE
while True:
buf = reader.read(1024)
if buf:
if platform.architecture()[-1] == "ELF":
lines = buf.decode('utf8', errors='ignore')
else:
lines = buf.decode('gbk', errors='ignore')
lines = lines.replace('\r\n', '\n').replace('\r', '\n').split('\n')
lines[0] = lastline + lines[0]
for line in lines[:-1]:
if len(line) > 0:
yield line
lastline = lines[-1]
else:
break

这里用 read()函数从缓冲区拿数据,并且通过 process_line(log_line) 进行处理,这个函数就是我们可以自定的函数了,该打日志打日志,该保存保存。

接下来就是如何进行超时处理的,这里subprocess的wait(timeout)已经具有了超时功能,这里我们只要注意超时后将子进程杀死即可:

1
2
3
4
5
6
7
8
9
10
11
def kill(proc_pid):
process = psutil.Process(proc_pid)
for proc in process.children(recursive=True):
proc.kill()
process.kill()
try:
process.wait(timeout=timeout)
except subprocess.TimeoutExpired:
kill(process.pid)
log_thread.join()
raise

注意杀死进程后,还是需要将程序最后输出获取完,再结束日志进程。

最后可以通过日志和程序返回值process.returncode获取程序执行情况。

封装类

根据以上实现,本人封装了一个较为方便的超类,用的时候只需要继承这个类,再自定义打印日志和命令执行我能的回调函数即可(在回调函数中可以做异常处理)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
import threading
import logging
import platform
import subprocess
import psutil
OS_LINUX = "ELF"
def kill(proc_pid):
process = psutil.Process(proc_pid)
for proc in process.children(recursive=True):
proc.kill()
process.kill()

class CommandRunner:
def __init__(self, output_size: int = 500):
self.log_size = output_size
self.output = list()
self.lastline = ''
self.cmd = None
_, os = platform.architecture()
self.os = OS_LINUX if not os else os

def log(self, log_line):
raise NotImplementedError

def callback(self):
raise NotImplementedError

def run_cmd(self, cmd: str, timeout: int = -1) -> int:
logging.info("Running cmd: \"{}\"".format(cmd))
if self.os == OS_LINUX:
self.cmd = cmd
else:
self.cmd = cmd + " & exit"
self.process = subprocess.Popen(self.cmd, shell=True, bufsize=1024, stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
log_thread = threading.Thread(target=self.print_log, args=(self.process.stdout,))
log_thread.start()

if timeout > 0:
try:
self.process.wait(timeout=timeout)
except subprocess.TimeoutExpired:
kill(self.process.pid)
log_thread.join()
raise
else:
self.process.wait()
log_thread.join()

return self.callback()

def print_log(self, stdout):
for log_line in self._log_line_iter(stdout):
self.log(log_line)
if len(self.output) > self.log_size:
del self.output[0]
self.output.append(log_line)

def _log_line_iter(self, reader):
while True:
# fix massive log(memory error)
buf = reader.read(1024)
if buf:
if self.os == OS_LINUX:
lines = buf.decode('utf8', errors='ignore')
else:
lines = buf.decode('gbk', errors='ignore')
lines = lines.replace('\r\n', '\n').replace('\r', '\n').split('\n')
lines[0] = self.lastline + lines[0]
for line in lines[:-1]:
if len(line) > 0:
yield line
self.lastline = lines[-1]
else:
break