IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> Python知识库 -> Python多进程实践 -> 正文阅读

[Python知识库]Python多进程实践

在python中若想实现并发执行,应多使用多进程,而非多线程,python中多线程使用threading模块。在这里就涉及到python中的multiprocessing模块----一个基于进程并行的模块。它可以充分利用我们计算机上的多个核心,带来极大效率。同时也提供远程并发。

本人主要学习的multiprocessing模块提供的类有Process,Queue,Pipe,Lock,Pool,暂时够用,往后学习了其他类再做补充。

目录

Process

????????1.Process的参数????

????????2.Process的方法? ?

????????3.Processs使用测例

Queue

? ? ? ? 1. Queue的参数

? ? ? ? 2.Queue的方法

????????3.Queue使用测例

Pipe

? ? ? ? 1. Pipe的参数

????????2.Pipe使用测例

Pool

? ? ? ? 1.Pool的参数

? ? ? ? ?2.Pool的方法

? ? ? ? ?3. Pool使用测例

? ? ? ? ? ? ? ? ? ? ?

????????????????


Process

????????process能创造单独进程对象供用户使用。

????????1.Process的参数????

? ? ? ? ? ? ? ? group:仅用于兼容threading.Thread,基本不用主动设置,保持默认值即可

? ? ? ? ? ? ? ? target:? 要执行的子任务

? ? ? ? ? ? ? ? name:? ?为进程的名字,默认格式为Process - num

? ? ? ? ? ? ? ? args:? ? ?子任务的参数,必须是tuple类型

? ? ? ? ? ? ? ? kwargs: 子任务的参数,必须是字典类型

? ? ? ? ? ? ? ? daemon:守护进程的标志?,为True则为守护进程,为False则不设置为守护进程

????????2.Process的方法? ?

? ? ? ? ? ? ? ? 2.1?run()? ? ?

?????????????????用于表示进程是否活动的方法

? ? ? ? ? ? ? ? ? ?若启动,则会分配pid并提示started

????????????????????????

p1 <bound method BaseProcess.run of <Process name='Process-1' pid=7184 parent=3696 started>>

? ? ? ? ? ? ? ? ? ? 否则,不分配pid并提示 initial

p1 <bound method BaseProcess.run of <Process name='Process-1' parent=22380 initial>>

????????????????2.2 start()?

? ? ? ? ? ? ? ? ? ? 用于启动进程活动,并调用run()方法,且单独进程只能启动一次

????????????????2.3 join([timeout])

? ? ? ? ? ? ? ? ? ? ?会阻塞主进程,即主进程会等待调用join方法的进程执行结束后再开始执行

? ? ? ? ? ? ? ? ? ? ?timeout参数是可选参数,默认值为None,用于表示会阻塞timeout秒。值得注意的是必须在启动进程后再join(),且不能让进程join自身,这会导致死锁

????????????????2.4 is_alive()

? ? ? ? ? ? ? ? ? ? ? 用于判断进程是否存活,即进程是否终止

? ? ? ? ? ? ? ? 2.5 pid

? ? ? ? ? ? ? ? ? ? ?返回进程的ID,只有启动的进程才会有ID,否则为None

????????????????2.6 daemon

? ? ? ? ? ? ? ? ? ? ?守护进程的标志,布尔值类型,需要在start()方法之前调用。当进程退出时,会终止其所有守护进程子进程。值得注意的是,不能为守护进程创建子进程,否则在守护进程因父进程终止时,其子进程会变成孤儿进程

? ? ? ? ? ? ? ? 2.7 terminate()

? ? ? ? ? ? ? ? ? ? ? ? 用于终止进程,除非进程提前调用join()方法,否则不会执行该进程

????????3.Processs使用测例

? ? ? ? ? ? ? ? ? ?值得注意的是,Process对象必须在?if __name__ == "__main__":后初始化,且Process对象的方法体必须在if __name__ == "__main__":之上定义

????????????????

def say_hello(name):
    # 以 f开头表示在字符串内支持大括号内的python 表达式
    print(f"Process - {name} info say hello")
    time.sleep(random.randrange(1,5))
    print(f"Process-{name} info say goodbye")


if __name__ == "__main__":
    p1 = Process(target=say_hello,args=("p1",))
    p2 = Process(target=say_hello, args=("p2",))
    p1.start()
    p2.start()
    print("主进程")

结果为

主进程
Process - p2 info say hello
Process - p1 info say hello
Process-p1 info say goodbye
Process-p2 info say goodbye

? 可以看到进程也不是按照编写代码的顺序执行的,且若不加join则主进程在前

? 当然,创建Process类的方法也可以通过自定义类继承Process来实现

class myProcess(Process):
    def __init__(self,num):
        super(myProcess, self).__init__()
        self.num = num
    def run(self) -> None:
        print(os.getpid(),"输出了",self.num)

if __name__ == "__main__":

    for i in range(4):
        p = myProcess(i)
        p.start()

结果为

15340 输出了 0
25392 输出了 1
15520 输出了 2
5612 输出了 3

? 进程之间的内存空间也是隔离的,下面 进行验证

check_num = 10
def check_memory():
    global check_num
    check_num = os.getpid()
    print("我是子进程",check_num)

if __name__ == "__main__":

    p = Process(target=check_memory)
    p.start()
    time.sleep(2)
    print("我是主进程",check_num)

结果为

我是子进程 18828
我是主进程 10

?由此可知,进程间的内存空间不共享

Queue

????????返回一个使用一个管道和少量锁和信号量实现的共享队列实例。当一个进程将一个对象放进队列中时,一个写入线程会启动并将对象从缓冲区写入管道中。队列也是一个效率高且不会存在进程的相互竞争所导致的错乱问题,简单来说Queue很安全

? ? ? ? 1. Queue的参数

? ? ? ? ? ? ? ?Queue([maxsize])

? ? ? ? ? ? ? ? ? ? ? ? maxsize为队列的最大项数

? ? ? ? 2.Queue的方法

? ? ? ? ? ? ? ? ? ? ?2.1 qsize()

????????????????????????????????返回队列的大致长度。由于多线程或者多进程的上下文,这个数字是不可靠的。

? ? ? ? ? ? ? ? ? ? ?2.2 empty()

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? 判断队列是否为空

? ? ? ? ? ? ? ? ? ? ? 2.3 full()

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? 判断队列是否为满

? ? ? ? ? ? ? ? ? ? ? 2.4??put(obj[,?block[,?timeout]])

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?obj为你要放入队列的对象? ?,block参数是布尔值,为True(默认值)时,等待timeout秒若队列中仍未有可用缓冲槽则抛出异常,期间会阻塞进程。? 若block为False??仅当有可用缓冲槽时放入对象,否则抛出异常 ,且在这种情形下?timeout?参数会被忽略

? ? ? ? ? ? ? ? ? ? ? ?2.5?get([block[,?timeout]])

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?从队列中取出并返回对象,block参数是布尔值,为True(默认值)时,等待timeout秒若队列中仍未有可用对象则抛出异常,期间会阻塞进程。? 若block为False??仅当有可用对象能够取出时返回,否则抛出异常 ,且在这种情形下?timeout?参数会被忽略

????????3.Queue使用测例

class student(Process):
    def __init__(self,q):
        super(student, self).__init__()
        self.q = q
    def run(self):
        while True:
            res = self.q.get()
            time.sleep(random.randint(1,5))
            print(f"{os.getpid()} 写了 {res}")

class teacher(Process):
    def __init__(self,q):
        super(teacher, self).__init__()
        self.q = q

    def run(self):
        for i in range(5):
            bid = "作业"+str(i)
            time.sleep(random.randint(1,3))
            self.q.put(bid)
            print(f"{os.getpid()} 布置了{bid}")

if __name__ == "__main__":
    
    q= Queue()
    s = student(q)
    t = teacher(q)

    s.start()
    t.start()

? 结果为

? ? ? ? ? ? ??

4772 布置了作业0
4772 布置了作业1
24160 写了 作业0
4772 布置了作业2
4772 布置了作业3
4772 布置了作业4
24160 写了 作业1
24160 写了 作业2
24160 写了 作业3
24160 写了 作业4

但是这有一个问题,即程序不会终止,因为student一直在等待teacher布置任务,陷入死循环

解决方式为使用 JoinableQueue

class student(Process):
    def __init__(self,q):
        super(student, self).__init__()
        self.q = q
    def run(self):
        while True:
            res = self.q.get()
            time.sleep(random.randint(1,5))
            print(f"{os.getpid()} 写了 {res}")

            self.q.task_done()

class teacher(Process):
    def __init__(self,q):
        super(teacher, self).__init__()
        self.q = q

    def run(self):
        for i in range(5):
            bid = "作业"+str(i)
            time.sleep(random.randint(1,3))
            self.q.put(bid)
            print(f"{os.getpid()} 布置了{bid}")

        self.q.join()

if __name__ == "__main__":

    q= JoinableQueue()
    s = student(q)
    t = teacher(q)
    s.daemon =True

    s.start()
    t.start()
    t.join()
    print("主进程")
18220 布置了作业0
13888 写了 作业0
18220 布置了作业1
18220 布置了作业2
18220 布置了作业3
13888 写了 作业1
18220 布置了作业4
13888 写了 作业2
13888 写了 作业3
13888 写了 作业4
主进程

解释一下JoinableQueue的两个方法

task_done() 是用来告诉teacher进入队列的任务都已经完成了,会给队列的join方法发送一个信号,如果join方法在阻塞中,则该方法会在所有对象处理完后返回

join()方法会阻塞进程直至队列中所有元素都被处理完毕

因此这里的顺序为teacher等待student写完作业就往队列里加作业,加载完毕后就直至student写完,开始主进程,主进程结束,作为守护进程的student也结束了

Pipe

? ? ? ? 能够在两个进程之间传递消息

? ? ? ? 1. Pipe的参数

????????????????Pipe([duplex])

? ? ? ? ? ? ? ? ? ? ? ?Pipe返回一个元组(conn1,conn2)两端都可以接受和发送 数据,?若duplex为True(默认值)则那么该管道是双向的。如果?duplex?被置为?False?,那么该管道是单向的,即只能一端接受,一端发送

????????2.Pipe使用测例

def pipe_put(p):

    for i in range(5):
        p.send([i,os.getpid()])

        print("我放入了",i,os.getpid(),"\n")
        time.sleep(1)
    p.close()
def pipe_get(p):
    print(os.getpid(),"开始获取","\n")
    while True:
        try:
            print(os.getpid(),"得到了",p.recv(),"\n")
    # time.sleep(1)
        except:
            p.close()

if __name__ == "__main__":
 # 管道的两端
    teacher,student = Pipe()
    p = Process(target=pipe_put,args=(teacher,))
    q = Process(target=pipe_get,args=(student,))

    p.start()
    
    q.start()
    q.join()

结果为

20312 开始获取 

我放入了 0 14260 

20312 得到了 [0, 14260] 

我放入了 1 14260 

20312 得到了 [1, 14260] 

我放入了 2 14260 

20312 得到了 [2, 14260] 

我放入了 203123  得到了

14260 [3, 14260]  

我放入了 4 14260 

20312  得到了 [4, 14260] 

? 值得注意的是,程序禁止两个不同的进程同时尝试读取同一端,或者写入同一端?? ? ? ? ? ??

Pool

? ? ? ? 可以创建一个进程池,简单来说就是提供一定数量的进程来供用户使用,和线程池类似,若用户提交请求且进程池未满则,分配进程给用户,否则等待空闲进程。

? ? ? ? 1.Pool的参数

????Pool([processes[,?initializer[,?initargs[,?maxtasksperchild[,?context]]]]])

? ? ? ? ? ? ? ? ? ? ? ? 1.1 processes: 为要使用的工作进程的数目。如果?processes?为?None,则使用?os.cpu_count()?返回的值

? ? ? ? ? ? ? ? ? ? ? ? 1.2 initilizer:为每一个进程启动时要执行的对象

? ? ? ? ? ? ? ? ? ? ? ? 1.3 initargs: 为交给initilizer的参数组

? ? ? ? ? ? ? ? ? ? ? ? 1.4?maxtasksperchild:?是一个工作进程在它退出或被一个新的工作进程代替之前能完成的任务数量,为了释放未使用的资源

? ? ? ? ?2.Pool的方法

? ? ? ? ? ? ? ? 2.1?apply(func[,?args[,?kwds]])

? ? ? ? ? ? ? ? ? ? ? ? apply会在返回结果前阻塞,不太适合并行化处理

? ? ? ? ? ? ? ? 2.2?apply_async(func[,?args[,?kwds[,?callback[,?error_callback]]]])

? ? ? ? ? ? ? ? ? ? ? ? 有一个callback和error_callback回调函数,他们都可以接收单个参数并执行回调,不同之处在于执行成功时调用callback,失败时调用error_callback。相比于apply方法,它属于异步处理,即可以不等待进程池中的进程执行完毕即可继续分配进程并执行

? ? ? ? ? ? ? ? 2.3 close()

? ? ? ? ? ? ? ? ? ? ? ? 关闭进程池,阻止新任务继续提交到进程池,等待进程池中任务执行完毕则退出

? ? ? ? ? ? ? ? 2.4 join()

? ? ? ? ? ? ? ? ? ? ? ? 等待工作进程,调用join前必须先调用close或者terminate

? ? ? ? ? ? ? ? 2.5 terminate()

? ? ? ? ? ? ? ? ? ? ? ? 立即停止所有进程

? ? ? ? ?3. Pool使用测例

? ? ? ? ? ? ? ??

def fun(num):
    print("我来了",num)
    time.sleep(random.randint(1,5))

    return None


if __name__ == "__main__":

    with Pool(processes=4) as pool:
        for i in range(10):
            result = pool.apply(fun,(i,))
            print("我好了",i)
        pool.close()
        pool.join()
    print("主进程")

结果为

我来了 0
我好了 0
我来了 1
我好了 1
我来了 2
我好了 2
我来了 3
我好了 3
我来了 4
我好了 4
我来了 5
我好了 5
我来了 6
我好了 6
我来了 7
我好了 7
我来了 8
我好了 8
我来了 9
我好了 9
主进程

显而易见,apply是同步方法,需等待进程池中任务执行完毕后才可以继续分配进程

再来看看apply_async

def fun(num):
    print("我来了",num)
    time.sleep(random.randint(1,5))

    return None


if __name__ == "__main__":

    with Pool(processes=4) as pool:
        for i in range(10):
            result = pool.apply_async(fun,(i,))
            print("我好了",i)
        pool.close()
        pool.join()
    print("主进程")

结果为

我好了 0
我好了 1
我好了 2
我好了 3
我好了 4
我好了 5
我好了 6
我好了 7
我好了 8
我好了 9
我来了 0
我来了 1
我来了 2
我来了 3
我来了 4
我来了 5
我来了 6
我来了 7
我来了 8
我来了 9
主进程

还可以调用callback

def fun(num):
    print("我来了",num)
    return num*num

def back(res):
    print(os.getpid(),"拿到了",res)
    time.sleep(3)

if __name__ == "__main__":
    with Pool(processes=4) as pool:
        for i in range(5):
            result = pool.apply_async(fun,(i,),callback=back)
        pool.close()
        pool.join()
    print("主进程")

结果为

我来了 0
我来了 1
14944 拿到了 0
我来了 2
我来了 3
我来了 4
14944 拿到了 1
14944 拿到了 4
14944 拿到了 9
14944 拿到了 16
主进程

如有错误,请指正

????????????????

????????

? ? ? ? ? ? ? ? ? ? ?

????????????????

  Python知识库 最新文章
Python中String模块
【Python】 14-CVS文件操作
python的panda库读写文件
使用Nordic的nrf52840实现蓝牙DFU过程
【Python学习记录】numpy数组用法整理
Python学习笔记
python字符串和列表
python如何从txt文件中解析出有效的数据
Python编程从入门到实践自学/3.1-3.2
python变量
上一篇文章      下一篇文章      查看所有文章
加:2021-11-15 15:49:40  更:2021-11-15 15:50:37 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/15 23:40:02-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码