python之多进程 multiprocessing

python的多进程模块在文件I/O操作的时候用的特别多,尽可能的发挥出多核系统的所有性能。

针对python的并发任务做个大体的总结。

multiprocessing Basics

1
2
3
4
5
6
7
8
9
10
11
import threading
import time

def hello(n):
time.sleep(random.randint(1,3))
print("[{0}] Hello\n".format(n))

for i in range(10):
threading.Thread(target=hello, args=(i,)).start()

print("Done!")

输出:

Done!

[1] Hello

[6] Hello

[9] Hello

[5] Hello

[2] Hello

[3] Hello

[4] Hello

[8] Hello

[7] Hello

[0] Hello

可以看出来 hello这个函数是 并行的调用,而不是按照顺序的调用

如果想让Done在所有threads调用完之后再运行,可以使用join

需要捕捉每一个 threading.Thread的实例,放入一个list,然后每一个thread调用join,如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import threading
import random
import time

def hello(n):
time.sleep(random.randint(1,3))
print("[{0}] Hello! \n".format(n))

threads = []
for i in range(10):
t = threading.Thread(target=hello, args=(i,))
threads.append(t)
t.start()

for thread in threads:
thread.join()

print("Done!")

输出:

[0] hello!

[7] hello!

[4] hello!

[9] hello!

[6] hello!

[2] hello!

[5] hello!

[3] hello!

[8] hello!

[1] hello!

Done!

这个版本与第一个版本唯一不同之处就是将thread对象放入了list中,然后迭代整个list,一个一个的做join

那么multiprocess和 threads什么区别呢

  • threading ——> multiprocessing

  • Thread —> Process

  • threads —> processes

  • thread —> process

如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import multiprocessing
import time
import random

def hello(n):
time.sleep(random.randint(1,3))
print("[{0}] hello\n".format(n))

processes = []
for i in range(10):
p = multiprocessing.Process(target=hello, args=(i,))
processes.append(p)
p.start()

for pro in processes:
pro.join()

print("Done!")

每一个Process 代表一个进程, 可以使用 ps来查看,以及可以 使用 kill 来停止

有何不同

threads是共享全局变量,不同的threads之间是可以互相影响的

process是独立的分开的,一个process是无法影响另一个process的变量、

如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import threading,thread
import random
import time

mylist=[]

def hello(n):
time.sleep(random.randint(1,3))
mylist.append(thread.get_ident())
print("[{0}] Hello\n".format(n))

threads = []
for i in range(5):
t = threading.Thread(target=hello, args=(i,))
threads.append(t)
t.start()

for one_thread in threads:
one_thread.join()

print("Done!")
print(len(mylist))
print(mylist)

输出的结果是:

[0] Hello

[2] Hello

[1] Hello

[3] Hello

[4] Hello

Done!
5
[123145472749568, 123145481162752, 123145476956160, 123145485369344, 123145489575936]

可以看到mylist这个变量是共享的,每一个thread都可以在mylist里添加element

再来看看process:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24

import multiprocessing
import random
import time,os

mylist=[]

def hello(n):
time.sleep(random.randint(1,3))
mylist.append(os.getpid())
print("[{0}] hello!\n".format(n))

processes = []
for i in range(5):
t = multiprocessing.Process(target=hello, args=(i,))
processes.append(t)
t.start()

for one_process in processes:
one_process.join()

print("Done!")
print(len(mylist))
print(mylist)

输出结果是:

[0] hello!

[4] hello!

[1] hello!

[3] hello!

[2] hello!

Done!
0
[]

为什么是空的??

因为5个进程分别会有一个list,然后在list里添加pid

而主进程的mylist呢,仍然是空的,因为没有人往里面添加pid

Queues

在我们写代码的时候,最好不要使用thread来操作全局变量,因为 线程不安全

想保证线程安全的的话,最好使用 multiprocessing里的 Queue

Queues是FIFO的

添加data,调用put

取data,调用get

queue能作为各个进程之间的桥梁,可以作为进程之间的通讯,比如如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28

import multiprocessing
import time,random,os
from multiprocessing import Queue

q = Queue()

def hello(n):
time.sleep(random.randint(1,3))
q.put(os.getpid())
print("[{0}] Hello\n".format(n))

processes = []
for i in range(5):
t = multiprocessing.Process(target=hello, args=(i,))
processes.append(t)
t.start()

for one_process in processes:
one_process.join()

mylist = [ ]
while not q.empty():
mylist.append(q.get())

print("Done!")
print(len(mylist))
print(mylist)

Queue在不同的进程之间进行通讯,可以使用pickle来保存

q.put(os.getpid()) 每一个不同的进程会将自己的pid保存到队列中

总结

threads不是真正意义上的并发

multiprocessing提供了线程安全的方法

附加

Queue和Pipe

Pipe输出结果也是先进先出,跟Queue的原理一样

如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
from multiprocessing import Pipe, Process

conn1, conn2 = Pipe()

def f1():
for i in range(5):
conn1.send(i)

def f2():
for i in range(5):
print(conn2.recv())

Process(target=f1).start()
Process(target=f2).start()

Queue如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
from multiprocessing import Process,Pipe

queue = Queue()

def f1():
for i in range(5):
queue.put(i)

def f2():
for i in range(5):
print(queue.get())

Process(target=f1).start()
Process(target=f2).start()

输出都是

0

1

2

3

4

5

具体的区别:

For passing messages one can use Pipe() (for a connection between two processes) or a Queue() (which allows multiple producers and consumers).

Pipe用于两个进程通信

Queue用户多个(两个以及以上)间的进程通信

When to use them

If you need more than two points to communicate, use a Queue().

If you need absolute performance, a Pipe() is much faster because Queue() is built on top of Pipe().

参考Pipe vs Queue

参考文章:

Multiprocessing in Python