Python 开发扫描器时往往会调用其他第三方工具,但是通过 os.system()
或者其他命令中是会有这样或那样的坑或不足,本文对这些问题进行总结并提出解决方案,并在结尾给出了一个封装好的类,能较为完美的解决Python调用第三方命令的问题。
需求
首先,我可以调用任意命令,并且可以在命令执行时向输入通道(stdin)传更多输入,同时Python能实时获取输出通道(stdout)和错误通道(stderr)获程序输出,注意是获取输出,而不是单纯的将输出重定向到屏幕或文件中;
我可以通过返回值,或者程序输出判断第三方程序执行是否出错,如果出错抛出异常或者进行异常处理;
执行命令是最好可以设置超时时间,防止子命令假死而影响主程序;
这段调用程序应该是跨平台的,毕竟Python本身就是跨平台语言。
实现 看到第一条和第二条需求,首先想到用 subprocess.Popen()
可以实现,确实,我们可以基于它进行改造,完成我们的任务。
subprocess.communicate()
虽然可以让 Python 拿到程序返回到stdout或stderr的输出,但是它不是实时的,要想做到实时输出,首先将stdout和stderr通道重定向到subprocess.PIPE
,这里方便起见,我先将stderr通道重定向到stdout,再将stdout重定向到subprocess.PIPE
,注意如果是Windows平台,直接传命令是没法拿到正确的程序返回值的,因此要在后面加上& exit
命令:
1 2 3 4 5 6 if platform.architecture()[-1 ] == "ELF" : cmd = cmd else : cmd = cmd + " & exit" process = subprocess.Popen(cmd, shell=True , bufsize=1024 , stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
下面考虑如何从process中拿数据的问题,这里一定要新开一个线程取数据,否则可能会出现子命令执行产生大量输出填满了subprocess的缓冲区,导致整个程序阻塞的问题:
1 2 log_thread = threading.Thread(target=print_log, args=(process.stdout,)) log_thread.start()
以上的 print_log(stdout)
函数用来不断向输出缓冲区取数据,并进行其他处理,这里的stdout相当于一个文件句柄,但是实践表明不能用 readline() 和readlines() 读数据,因为第三方程序输出有可能会用\r
在同一行上多次打印输出(如在命令行显示进度条的及情况),用readlines()
更会造成程序运行效率慢的问题(谁也不想一个日志处理环节把一个核心的CPU打满吧?),因此读取日志的函数应该这样写:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 LASTLINE="" def print_log ( stdout) : for log_line in _log_line_iter(stdout): process_line(log_line) def _log_line_iter (reader) : global LASTLINE while True : buf = reader.read(1024 ) if buf: if platform.architecture()[-1 ] == "ELF" : lines = buf.decode('utf8' , errors='ignore' ) else : lines = buf.decode('gbk' , errors='ignore' ) lines = lines.replace('\r\n' , '\n' ).replace('\r' , '\n' ).split('\n' ) lines[0 ] = lastline + lines[0 ] for line in lines[:-1 ]: if len(line) > 0 : yield line lastline = lines[-1 ] else : break
这里用 read()
函数从缓冲区拿数据,并且通过 process_line(log_line)
进行处理,这个函数就是我们可以自定的函数了,该打日志打日志,该保存保存。
接下来就是如何进行超时处理的,这里subprocess的wait(timeout)
已经具有了超时功能,这里我们只要注意超时后将子进程杀死即可:
1 2 3 4 5 6 7 8 9 10 11 def kill (proc_pid) : process = psutil.Process(proc_pid) for proc in process.children(recursive=True ): proc.kill() process.kill() try : process.wait(timeout=timeout) except subprocess.TimeoutExpired: kill(process.pid) log_thread.join() raise
注意杀死进程后,还是需要将程序最后输出获取完,再结束日志进程。
最后可以通过日志和程序返回值process.returncode
获取程序执行情况。
封装类 根据以上实现,本人封装了一个较为方便的超类,用的时候只需要继承这个类,再自定义打印日志和命令执行我能的回调函数即可(在回调函数中可以做异常处理)。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 import threadingimport loggingimport platformimport subprocessimport psutilOS_LINUX = "ELF" def kill (proc_pid) : process = psutil.Process(proc_pid) for proc in process.children(recursive=True ): proc.kill() process.kill() class CommandRunner : def __init__ (self, output_size: int = 500 ) : self.log_size = output_size self.output = list() self.lastline = '' self.cmd = None _, os = platform.architecture() self.os = OS_LINUX if not os else os def log (self, log_line) : raise NotImplementedError def callback (self) : raise NotImplementedError def run_cmd (self, cmd: str, timeout: int = -1 ) -> int: logging.info("Running cmd: \"{}\"" .format(cmd)) if self.os == OS_LINUX: self.cmd = cmd else : self.cmd = cmd + " & exit" self.process = subprocess.Popen(self.cmd, shell=True , bufsize=1024 , stdout=subprocess.PIPE, stderr=subprocess.STDOUT) log_thread = threading.Thread(target=self.print_log, args=(self.process.stdout,)) log_thread.start() if timeout > 0 : try : self.process.wait(timeout=timeout) except subprocess.TimeoutExpired: kill(self.process.pid) log_thread.join() raise else : self.process.wait() log_thread.join() return self.callback() def print_log (self, stdout) : for log_line in self._log_line_iter(stdout): self.log(log_line) if len(self.output) > self.log_size: del self.output[0 ] self.output.append(log_line) def _log_line_iter (self, reader) : while True : buf = reader.read(1024 ) if buf: if self.os == OS_LINUX: lines = buf.decode('utf8' , errors='ignore' ) else : lines = buf.decode('gbk' , errors='ignore' ) lines = lines.replace('\r\n' , '\n' ).replace('\r' , '\n' ).split('\n' ) lines[0 ] = self.lastline + lines[0 ] for line in lines[:-1 ]: if len(line) > 0 : yield line self.lastline = lines[-1 ] else : break