admin 管理员组

文章数量: 1087652

python进程通信方式总结(一):管道与信号量

进程介绍

一个独立进程不受其他进程执行的影响,而一个协作进程可能会受到其他执行进程的影响,尽管可以认为那些独立运行的进程将非常高效地执行,但实际上,在许多情况下,可以利用合作性质来提高计算速度,便利性和模块化。进程间通信(IPC)是一种机制,允许进程彼此通信并同步其动作。这些过程之间的通信可以看作是它们之间进行合作的一种方法。

进程主要通过以下两者相互通信:

  1. 共享内存
  2. 讯息传递

而在实际使用情况中,我们又可以将其分为7种,如下图所示:

下面就对上面列举的方式在python中进行逐个说明,可能我理解的内容与理论有些出入,因为我是从实际使用上总结,欢迎私信或者评论。

python进程方式

进程通信方式说明

  • 管道pipe:管道是一种半双工的通信方式,数据只能单向流动,而且只能在具有亲缘关系的进程间使用。进程的亲缘关系通常是指父子进程关系。
  • 命名管道FIFO:有名管道也是半双工的通信方式,但是它允许无亲缘关系进程间的通信。
  • 消息队列MessageQueue:消息队列是由消息的链表,存放在内核中并由消息队列标识符标识。消息队列克服了信号传递信息少、管道只能承载无格式字节流以及缓冲区大小受限等缺点。
  • 共享存储SharedMemory:共享内存就是映射一段能被其他进程所访问的内存,这段共享内存由一个进程创建,但多个进程都可以访问。共享内存是最快的 IPC 方式,它是针对其他进程间通信方式运行效率低而专门设计的。它往往与其他通信机制,如信号量,配合使用,来实现进程间的同步和通信。
  • 信号量Semaphore:信号量是一个计数器,可以用来控制多个进程对共享资源的访问。它常作为一种锁机制,防止某进程正在访问共享资源时,其他进程也访问该资源。因此,主要作为进程间以及同一进程内不同线程之间的同步手段。
  • 套接字Socket:套解口也是一种进程间通信机制,与其他通信机制不同的是,它可用于不同及其间的进程通信。
  • 信号 ( sinal ) : 信号是一种比较复杂的通信方式,用于通知接收进程某个事件已经发生。

上述七种方式对应于python中有三个主要的包能完成操作,分别是:

  • subprocess:可以在当前程序中执行其他程序或命令;
  • mmap:提供一种基于内存的进程间通信机制;
  • multiprocessing:提供支持多处理器技术的多进程编程接口,并且接口的设计最大程度地保持了和threading模块的一致,便于理解和使用。

下面针对七种方式与python的三种模块进行分别讲解:

python利用管道通信

multiprocessing.pipe

关于管道,我用的倒不是很多,通常在python中,使用管道一般都是subprocess模块中的popen,并且是在调用外部shell程序,对应的还有stdin,stdout的状态,而我也是在写本篇博文,找资料的时候才知道multiprocessing竟然也有一个pipe,但这个通信方式给我的感觉更像是双向队列,并且拆分成了两个,具体的demo参考:

# coding:utf-8
from multiprocessing import Process, Pipedef func(conn2):conn2.send("I am a child process.")print("Message from the parent process:", conn2.recv())conn2.close()if __name__ == '__main__':conn1, conn2 = Pipe()  # 建立管道,拿到管道的两端,双工通信方式,两端都可以收发消息p = Process(target=func, args=(conn2,))  # 将管道的一端给子进程p.start()  # 开启子进程print("Message from the child process:", conn1.recv())  # 主进程接受来自子进程的消息conn1.send("I am the main process.")  # 主进程给子进程发送消息conn1.close()

demo中数据从conn1流向conn2,而conn2的消息发送给了conn1,这种叫全双工模式,因为有个默认值duplex参数为True,为False就只能1进2出。

上述是建立在一种比较理想的测试环境下进行的,我没有具体看过multiprocessing的源码,因为是调用的C语言包,听说里面的弯弯绕绕还是挺多的,但从一些issue里得知,multi的pipe有线程不安全问题,还有数据接收端会在没数据的时候卡住,关于前面这个问题,也能用一个例子来解释:

from threading import Thread, Locknumber = 0def target():global numberfor _ in range(1000000):number += 1thread_01 = Thread(target=target)
thread_02 = Thread(target=target)
thread_01.start()
thread_02.start()
thread_01.join()
thread_02.join()
print(number)

多跑例子几次,我们会发现每次输出的number都不相同,原因就是如果没有锁的机制,多个线程先后更改数据造成所得到的数据是脏数据,这就叫线程不安全。而解决的方法就是加锁,就是上面代码注释的那部分替换。

但一般都是用pipe都会使用进程去开,那么就避免了探讨安不安全的问题。关于第二个问题,我在实验过后发现确实如此,如果发送和接收数据不对等,程序会卡住,且没有任何报错,所以,可能因为pipe的种种限制,以及支持场景较少,而直接采用queue来进行了二次封装,线程threading模块同样做了相关改进,根据某些资料说是queue自身实现了锁原语,因此它才能实现人工原子操作。

subprocess.popen

popen的通用格式为:

subprocess.Popen(args, bufsize=0, executable=None, stdin=None, stdout=None, stderr=None, preexec_fn=None, close_fds=False, shell=False, cwd=None, env=None, universal_newlines=False, startupinfo=None, creationflags=0)

具体参数为:

参数名参数说明
args要执行的命令或可执行文件的路径
bufsize控制 stdin, stdout, stderr 等参数指定的文件的缓冲,和打开文件的 open()函数中的参数 bufsize 含义相同
executable如果这个参数不是 None,将替代参数 args 作为可执行程序
stdin指定程序的标准输入
stdout指定程序的标准输出
stderr指定程序的标准错误输出
preexec_fn默认是None,否则必须是一个函数或者可调用对象,在子进程中首先执行这个函数,然后再去执行为子进程指定的程序或Shell。
close_fds布尔型变量,为 True 时,在子进程执行前强制关闭所有除 stdin,stdout和stderr外的文件;
shell布尔型变量,明确要求使用shell运行程序,与参数 executable 一同指定子进程运行
cwd代表路径的字符串,指定子进程运行的工作目录,要求这个目录必须存在;
env字典,键和值都是为子进程定义环境变量的字符串;
universal_newline布尔型变量,为 True 时,stdout 和 stderr 以通用换行(universal newline)模式打开
creationfalgs最后这两个参数是Windows中才有的参数,传递给Win32的CreateProcess API调用。

然后关于这个的应用场景,一般都是通过它调用一个进程去处理shell语句,我是根据它做ffmpeg的调用,用来生成视频,以及一些其它的流媒体。下面是调用shell的一个demo:

import os,time
from subprocess import *
from multiprocessing import *def run_shell_cmd(cmd_str, index):print('run shell cmd index %d'%(index,))proc = Popen(['/bin/zsh', '-c', cmd_str],stdout=PIPE)time.sleep(1)outs = proc.stdout.readlines()proc.stdout.close()proc.terminate()return def multi_process_exc():pool = cmd_str = 'ps -ef | grep chromium'for x in range(10):   p = Process(target=run_shell_cmd, args=(cmd_str,x))p.start()pool.append(p)for p in pool:p.join()
if __name__ == "__main__":multi_process_exc()

subprocess模块能说的不多,因为我也用得不多,当然,除了这个,还有很多管道的例子,比如opencv官网下的一个issue,就有人用win32pipe来做信息传输:

#!/usr/bin/env python
import cv2
import win32pipe, win32file
from threading import Threaddef runPipe():    p = win32pipe.CreateNamedPipe(r'\\.\pipe\myNamedPipe',win32pipe.PIPE_ACCESS_DUPLEX,win32pipe.PIPE_TYPE_MESSAGE | win32pipe.PIPE_WAIT,1, 1024, 1024, 0, None)        win32pipe.ConnectNamedPipe(p, None)    with open("D:\\Streams\\mystream.ts", 'rb') as input:while True:data = input.read(1024)if not data:breakwin32file.WriteFile(p, data)                def extract():cap = cv2.VideoCapture(r'\\.\pipe\myNamedPipe')    fnum = 0while(True):# Capture frame-by-frameret, frame = cap.read()                                                                             print fnum, "pts:", cap.get(cv2.cv.CV_CAP_PROP_POS_MSEC)fnum = fnum + 1                # When everything done, release the capturecap.release()   if __name__ == "__main__":    thr = Thread(target=extract)thr.start()runPipe()print "bye"

但是到今年,目前也应该被淘汰了,目前主流都是基于queue,这将留在下一篇讲解。

python利用信号量通信

这个东西基本就没有用到过了,但仔细想想,其实很多底层都有用到,任何一个web框架,它都和上下文有些关系,像Django里的signal,flask中也是内置信号,它与设计模式中的观察者基本一致,后续我会再说明生产者消费者模型在queue里,所以这里简单提一下,关于在多进程中直接使用,multiprocessing与threading中都是叫Semaphore,Semaphore和锁相似,锁同一时间只允许一个对象(进程)通过,信号量同一时间允许多个对象(进程)通过,demo为:

import time
import random
from multiprocessing import Process
from multiprocessing import Semaphoredef home(name, se):se.acquire()  # 拿到一把钥匙print('%s进入了房间' % name)time.sleep(random.randint(1, 5))print('******************%s走出来房间' % name)se.release()  # 还回一把钥匙if __name__ == '__main__':se = Semaphore(2)       # 创建信号量的对象,有两把钥匙for i in range(7):p = Process(target=home, args=('tom{}'.format(i), se))p.start()
"""
tom1进入了房间
tom0进入了房间
******************tom1走出来房间
tom2进入了房间
******************tom0走出来房间
tom3进入了房间
******************tom3走出来房间
tom4进入了房间
******************tom2走出来房间
tom5进入了房间
******************tom5走出来房间
tom6进入了房间
******************tom4走出来房间
******************tom6走出来房间
"""

关于实际应用,可以看一道lc。

我们提供一个类:"""
class FooBar {public void foo() {for (int i = 0; i < n; i++) {print("foo");}}public void bar() {for (int i = 0; i < n; i++) {print("bar");}}
}
"""两个不同的线程将会共用一个 FooBar 实例。其中一个线程将会调用 foo() 方法,另一个线程将会调用 bar() 方法。请设计修改程序,以确保 "foobar" 被输出 n 次。
import threading
class FooBar:def __init__(self, n):self.n = nself.foo_lock = threading.Semaphore()self.foo_lock.acquire()self.bar_lock = threading.Semaphore()self.bar_lock.acquire()def foo(self, printFoo: 'Callable[[], None]') -> None:for i in range(self.n):# printFoo() outputs "foo". Do not change or remove this line.printFoo()self.bar_lock.release()self.foo_lock.acquire()def bar(self, printBar: 'Callable[[], None]') -> None:for i in range(self.n):# printBar() outputs "bar". Do not change or remove this line.self.bar_lock.acquire()printBar()self.foo_lock.release()

本文标签: python进程通信方式总结(一)管道与信号量