Python:thread & process

一、进程和线程的区别

1.线程是CPU的最小执行单位,直接运行在CPU上的是线程而不是进程;
2.进程是线程资源的集合,一个进程至少包含一个线程
3.线程之间可以共享内存资源,进程之间无法直接共享内存空间。

二、线程

1.Create thread:

1
2
3
4
5
6
7
8
9
10
11
12
13
import threading,time
start_time=time.time()
def run():
time.sleep(2)
print('In threading',threading.current_thread())
#print current sub thread id

for i in range(5):
t=threading.Thread(target=run,)
t.start()

print('In the main threading',threading.current_thread())
print('Running time :',time.time()-start_time)

Running result:

1
2
3
4
5
6
7
In the main threading <_MainThread(MainThread, started 9888)>
Running time : 0.0009999275207519531
In threading <Thread(Thread-1, started 9956)>
In threading <Thread(Thread-2, started 8524)>
In threading <Thread(Thread-4, started 3404)>
In threading <Thread(Thread-3, started 7692)>
In threading <Thread(Thread-5, started 9524)>

可以发现,运行时间并没有计算进子线程运行的时间,主线程本身并没有等待子线程,创建的子线程独立运行,子线程运行结束后,程序再退出。

2.Join
承接上文,主线程不会等待子线程执行,两者并行互不干扰。如有主线程需用到子线程的执行结果的场景,则需要使主进程阻塞等待子进程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import threading,time

start_time=time.time()
def run(num):
time.sleep(1)
print('In the threading :',threading.current_thread())

# t1=threading.Thread(target=run,args=(1,))
# t2=threading.Thread(target=run,args=(2,))
# t1.start()
# t2.start()
# t1.join()
# t2.join()
obj_t=[]
for i in range(10):
t=threading.Thread(target=run,args=(1,))
t.start()
# t.join()
obj_t.append(t)

for obj in obj_t:
obj.join()
print('In the main threading',threading.current_thread())

# 注意,这里将obj.join()单独操作的原因是避免每个子线程都被阻塞,这样多线程就变成了串行,没有意义了。可以自行对比join操作放在start之后和start之外的运行时间
print('Running time :',time.time()-start_time)
'''
Without the join attribute ,the main thread won't wait for the sub threading,
if add it ,the main thread will waiting for the sub thread util sub thread run end.
'''

Running result

1
2
3
4
5
6
In the threading : <Thread(Thread-4, started 9912)>
In the threading : <Thread(Thread-2, started 1188)>
In the threading : <Thread(Thread-1, started 7424)>
In the threading : <Thread(Thread-5, started 9348)>
In the threading : <Thread(Thread-3, started 10128)>
In the main threading <_MainThread(MainThread, started 8204)>

3.Daemon 线程
与子线程独立于主线程与之并行运行不同,守护线程在被主线程(或子线程)创建后,若主线程运行结束,不会等待守护进程,且程序直接退出。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import threading,time

def main_run():
time.sleep(1)
print('In the main thread',threading.current_thread())
sub_t=threading.Thread(target=sub_run)
sub_t.start()

def sub_run():
time.sleep(2)
print('In the sub thread ',threading.current_thread())
daemon_t=threading.Thread(target=daemon_run)
daemon_t.setDaemon(True)
daemon_t.start()

def daemon_run():
time.sleep(3)
print('In the daemon thread ',threading.current_thread())

main_run()

Running res:

1
2
3
4
In the main thread <_MainThread(MainThread, started 10064)>
In the sub thread <Thread(Thread-1, started 6716)>

Process finished with exit code 0

可以看到主线程率先执行完成,程序再继续等待子线程,子线程执行完后,没有等待守护线程,程序直接退出。

3.互斥锁
为避免多个线程在同时操作同一个数据时造成混乱,在必要的场景下需要给数据加锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import threading,time

lock=threading.Lock()
num=0
def run():
global num
num+=1
lock.acquire()
print('Current thread:\n',threading.current_thread())
print('Current num',num)
time.sleep(1)
lock.release() #Only one of the thread can run the content protected by the mutex lock.


for i in range(5):
t=threading.Thread(target=run)
t.start()

Running result

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
Current thread:
<Thread(Thread-1, started 2348)>
Current num: 1
Current thread:
<Thread(Thread-2, started 6128)>
Current num: 5
Current thread:
<Thread(Thread-3, started 7564)>
Current num: 5
Current thread:
<Thread(Thread-4, started 10104)>
Current num: 5
Current thread:
<Thread(Thread-5, started 10128)>
Current num: 5

4.信号量
承接上文,互斥锁即同时只能允许有一个线程在操作数据,引入信号量就可以指定允许的线程数的上限。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import threading,time

def run(n):
semaphore.acquire()
time.sleep(1)
print("run the thread: %s\n" %n)
semaphore.release()

if __name__ == '__main__': #run only in this module
semaphore = threading.BoundedSemaphore(2) #创建semaphore实例,最多允许2个线程同时运行锁住的内容
for i in range(10):
t = threading.Thread(target=run,args=(i,))
t.start()

while threading.active_count() != 1:
pass #print threading.active_count()
else:
print('----all threads done---')

Running reslut:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
run the thread: 0
run the thread: 1


run the thread: 2
run the thread: 3


run the thread: 4
run the thread: 5


run the thread: 6
run the thread: 7

run the thread: 8
run the thread: 9

----all threads done---

可以看到每次打印的数量为指定的信号量semaphore值:2

5.死锁:
当使用多个互斥锁(同步锁)同时使用且造成冲突时,会出现进程死锁情况:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
import threading
import time


def foo():
lockA.acquire()
print('func foo ClockA lock')
lockB.acquire()
print('func foo ClockB lock')
lockB.release()
print('func foo ClockB release')
lockA.release()
print('func foo ClockA release')


def bar():

lockB.acquire()
print('func bar ClockB lock')
time.sleep(2)

"""
用sleep模拟io操作,首先,foo进程的线程1率先执行完,bar进程的线程1在执行到这里的时候,lockB是锁上的还未释放,因此
foo进程的线程2无法申请lockB,阻塞住了,而sleep两秒过后,bar的线程1想要申请lockA,此时lockA是被foo的阻塞住的线程2
持有的,foo线程2被阻塞住无法释放lockA,因此bar这边想申请lockA也是不行的,由此双方都阻塞住,造成了死锁
"""
lockA.acquire()
print('func bar ClockA lock')
lockB.release()
print('func bar ClockB release')
lockA.release()
print('func bar ClockA release')


def run():
foo()
bar()


l = []
lockA = threading.Lock()
lockB = threading.Lock()
for i in range(10):
t = threading.Thread(target=run,args=())
t.start()
l.append(t)

for t in l:
t.join()

执行到bar的时候会阻塞住,结果如下,原因见上方代码内注释:

1
2
3
4
5
6
func foo ClockA lock
func foo ClockB lock
func foo ClockB release
func foo ClockA release
func bar ClockB lock
func foo ClockA lock

6.rlock 递归锁
在python中为了支持在同一线程中多次请求同一资源,避免死锁竞争,python提供了可重入锁RLock。这个RLock内部维护着一个Lock和一个counter变量,counter记录了acquire的次数,从而使得资源可以被多次require。直到一个线程所有的acquire都被release,其他的线程才能获得资源。例如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import threading,time

lock=threading.Lock()
#rlock=threading.RLock()
num=0
def sub_thread_run():
print('Come into the second lock')
lock.acquire()
#rlock.acquire()
global num
num+=1
print('Current num in the sub thread :',num)
lock.release()
#rlock.release()
print('Unlock the second lock')

def run():
global num
while num < 4:
num+=1
lock.acquire()
print('Current num in the main:',num)
t=threading.Thread(target=sub_thread_run())
t.start()
lock.release()

run()

running result:

1
2
3
Current num in the main: 1
Come into the second lock
...

可以到看会卡在第二层锁内无法出来,改为内层使用递归锁则正常运行:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import threading,time

lock=threading.Lock()
rlock=threading.RLock()
num=0
def sub_thread_run():
print('Come into the second lock')
#lock.acquire()
rlock.acquire()
global num
num+=1
print('Current num in the sub thread :',num)
#lock.release()
rlock.release()
print('Unlock the second lock')

def run():
global num
while num < 4:
num+=1
lock.acquire()
print('Current num in the main:',num)
t=threading.Thread(target=sub_thread_run())
t.start()
lock.release()


run()

running result:

1
2
3
4
5
6
7
8
9
10
Current num in the main: 1
Come into the second lock
Current num in the sub thread : 2
Unlock the second lock
Current num in the main: 3
Come into the second lock
Current num in the sub thread : 4
Unlock the second lock

Process finished with exit code 0

7.event
事件触发(信号触发),定义一个flag,在多线程之间通过这一个flag标志位互相传递信号,从而触发下一步动作,起到线程间协调沟通的作用。
一个小游戏:
红灯停,黄绿灯行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import threading,time

event=threading.Event() #create event instance

def redlight():
event.set()
#set the event flag
num=1
while True:

if 0< num <= 2:
print('\033[0;32mGreen light on\033[0m')
elif num <=3:
print('\033[0;33m Yellow light on\033[0m')
elif num <=5:
print('\033[0;31m Red light on\033[0m')
if event.isSet():
event.clear() #clear the event flag
else: #when num >8
num=0 #return back the num to zero
event.set()
num+=1
time.sleep(1)


def car(car_num):
while True:
if event.isSet(): #Return the bool value,Ture or False
print('car %i is running.'%car_num)
time.sleep(1)
else:
print('Red light on ,car %i waiting... '%car_num)
time.sleep(1)

task_redlight=threading.Thread(target=redlight,)
task_redlight.start()

for i in range(2):
car_task=threading.Thread(target=car,args=(i,))
car_task.start()

running result:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
Green light on
car 0 is running.
car 1 is running.
Green light on
car 0 is running.
car 1 is running.
Yellow light on
car 0 is running.
car 1 is running.
Red light on
Red light on ,car 0 waiting...
Red light on ,car 1 waiting...
Red light on
Red light on ,car 0 waiting...
Red light on ,car 1 waiting...

8.Queue
队列的作用:
1.解耦,实现程序松耦合
2.提升效率,非阻塞

生产者、消费者模式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import threading,time,queue

q=queue.Queue()
def producer():
for i in range(1,6):
q.put('包子 %i'%i)
print('包子 %i 熟了' %i)
time.sleep(0.5)
print('今天包子卖完了')

def consumer(name):
while True:
baozi=q.get()
print('%s is eating %s' %(name,baozi))
time.sleep(1)


p=threading.Thread(target=producer,)
p.start()

c1=threading.Thread(target=consumer,args=('ywq',))
c2=threading.Thread(target=consumer,args=('cqy',))
c1.start()
c2.start()

running result:

1
2
3
4
5
6
7
8
9
10
11
包子 1 熟了
ywq is eating 包子 1
包子 2 熟了
cqy is eating 包子 2
包子 3 熟了
ywq is eating 包子 3
cqy is eating 包子 4
包子 4 熟了
ywq is eating 包子 5
包子 5 熟了
今天包子卖完了

三、进程

C库 python 的GIL决定了python的多线程模型只是一个伪多线程模型,同时只能有一个线程在执行,通过定时高频的CPU上下文切换来实现多线程的假象,因此在CPU计算密集型的场景中使用C python多线程时,频繁的上下文切换甚至会导致性能更低,因此,多线程适用于I/O密集型场景。为了发挥出多Core硬件的性能,既然不能多线程,那就多进程把Core都占掉吧~
mutilprocessing模块语法与threading模块类似:

1.create process:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import multiprocessing,os

def run():
print('run pid :',os.getpid())
print('run father pid:',os.getppid())
f=multiprocessing.Process(target=sub_run,)
f.start()

def sub_run():
print('sub_run father pid:',os.getppid())

if __name__=='__main__':
p=multiprocessing.Process(target=run)
p.start()
print('father pid of main process:',os.getppid()) #the IDE process

running result:

1
2
3
4
father pid of main process: 11524
run pid : 12320
run father pid: 13292
sub_run father pid: 12320

2.进程间通信
2.1、queue队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
#Author :ywq
from multiprocessing import Queue,Process
import time

queue=Queue()

def sub_run(arg):
while arg.qsize() >0:
print('get',arg.get())
time.sleep(0.5)



def run():
for i in range(97,102):
queue.put(chr(i))
p=Process(target=sub_run,args=(queue,))
p.start()
p.join()

if __name__=='__main__':
run()

running result:

1
2
3
4
5
get a
get b
get c
get d
get e

2.2、pipe
管道传输,语法类似socket

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
#Author :ywq
from multiprocessing import Process, Pipe
local,peer = Pipe() #Pipe实例化时会创建两个对象,分别是管道的两端。相当于在主进程内实例化了两个
#传呼机,生成子进程的时候通过参数把其中一个传呼机给了子进程,两个进程间通过传呼机来通信
def send(pipe):
pipe.send(['hello'])
pipe.close()

def recv():
p = Process(target=send, args=(peer,))
p.start()
print(local.recv()) # prints "[42, None, 'hello']"
p.join()


if __name__ == '__main__':
recv()
print(local)
print(peer)

running result:

1
2
3
['hello']
<multiprocessing.connection.PipeConnection object at 0x00000000028524A8>
<multiprocessing.connection.PipeConnection object at 0x0000000000D734A8>

2.3、manager
以上的queue、pipe方式都是在两个进程间传输通信,而不是像线程里一样两个进程都可以直接操作内存中的数据,因为进程的memory是彼此独立的。manager模块内部实现了通过中间代理实时克隆数据的方式,使多个进程都可以同时操作同一份数据。
A manager object support data types include:list, dict, value, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Queue…

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
#Author :ywq
from multiprocessing import Process, Manager

def f(d, l):
d['a'] = 'yes'
l.append(0)
print(l)

if __name__ == '__main__':
manager=Manager()
dict = manager.dict()
list = manager.list()
p_list = []
for i in range(5):
p = Process(target=f, args=(dict, list))
p.start()
p_list.append(p)

for sub_process in p_list:
sub_process.join()

print(list)
print(dict)

running result:

1
2
3
4
5
6
7
[0]
[0, 0]
[0, 0, 0]
[0, 0, 0, 0]
[0, 0, 0, 0, 0]
[0, 0, 0, 0, 0]
{'a': 'yes'}

3.进程池
生成子进程的过程是直接将父进程的资源克隆一份,因此生成进程相对线程而言需要开销较大,为防止多进程导致资源用尽,所以需要线程池来约束同一时间最多可以运行的子进程数量。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
#Author :ywq
from multiprocessing import process,Pool
import os,time


def callback(num):
print('\033[0;33mThe callback func pid:%s \033[0m' %os.getpid()) #回调函数由主进程执行

def sub_run(num):
print('Num:',num)
print('\033[0;32mSub process is running:%s \033[0m'%os.getpid())
time.sleep(1)

def run():
pool=Pool(processes=2) #processes=2,实例化一个Pool对象,pool大小限制为2
print('\033[0;31mMain process running: %s \033[0m '%os.getpid())
for i in range(5):
#pool.apply(func=sub_run,args=(i,)) #子线程串行实施
#pool.apply_async(func=sub_run,args=(i,)) #子线程并行实施
pool.apply_async(func=sub_run,args=(i,),callback=callback) #执行完前方func的函数后,再来执行callback回调函数
#回调函数由主进程来执行
pool.close() #必须先close()后join,语法要求
pool.join() #不join的话主进程不会等待进程池里的子进程

if __name__ == '__main__': #windows平台下实例化pool多进程必须先判定__name__=='__main__'
run()

running result:

1
2
3
4
5
6
7
8
9
10
Main process running: 19436  
Num: 0
Sub process is running:18988
Num: 1
Sub process is running:18928
Num: 2
Sub process is running:18988
The callback func pid:19436
The callback func pid:19436
The callback func pid:19436

四、测试
4.1计算场景,单进程VS多线程
多线程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
#Author :ywq
import threading,time

num=0

def calc():
global num
a=num
num+=1
num+=a


start_time=time.time()

for i in range(100000):
t=threading.Thread(target=calc)
t.start()
#calc()

#print(num) #数值太大,不打印了
print(time.time()-start_time,' sencods')

result:

1
16.822962284088135 seconds

单进程:
注释掉clac(),取消注释t
result:

1
0.5590319633483887  seconds

多线程计算完成用时16S,单进程用时0.55S,差距很大。。。

4.2、I/O场景,单进程VS多线程

单进程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
#Author :ywq
import threading,time,os

def write():
f=open('test.txt','a')
f.write('just for test \n')
f.close()

start_time=time.time()
for i in range(30000):
#t=threading.Thread(target=write)
#t.start()
write()

print(time.time()-start_time,'seconds')

result:

1
12.502715110778809 seconds

多线程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
#Author :ywq
import threading,time,os

def write():
f=open('test.txt','a')
f.write('just for test \n')
f.close()

start_time=time.time()
for i in range(30000):
t=threading.Thread(target=write)
t.start()
#write()

print(time.time()-start_time,'seconds')

result:

1
9.011515378952026 seconds

多进程:
起三个进程,每个进程写1W行,共3W行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
#Author :ywq
import threading,time,os
from multiprocessing import Process

def write():
print('Pid:',os.getpid())
for i in range(10000):
#t=threading.Thread(target=write)
#t.start()
f=open('test.txt','a')
f.write('just for test \n')
f.close()


if __name__=='__main__':
start_time=time.time()
p_list=[]
for i in range(3):
p=Process(target=write,)
p.start()
p_list.append(p)

for process in p_list:
process.join()

print(time.time()-start_time,'seconds')

result:

1
2
3
4
Pid: 20516
Pid: 21564
Pid: 21600
5.744328498840332 seconds

总结:

python(C库python)因为早期设定的GIL,在CPU的计算指令到达一定的数量以后立即切换运算另一个线程,频繁快速的上下文切换以此来实现了伪多线程,实际上只有一个线程任务在被计算。python的多线程不适合计算密集型场景,但适用于I/O密集型场景,因此引入多进程。但多进程无法像多线程一样便利地互相共享数据,只能借助于队列或管道来通信,或第三方代理来实现数据共享,另外多进程的开销相比多线程要大很多。根据场景选择。

赏一瓶快乐回宅水吧~
-------------本文结束感谢您的阅读-------------