多进程

思维导图:

多进程multiprocess模块

定义引入:

  • 进程:就是操作系统执行的一个程序,操作系统以进程为单位分配存储空间,每个进程都有其对应的地址空间数据栈以及其他用于跟踪进程执行的辅助数据。进程可以通过fork或者spawn创建新的进程,不过新的进程拥有单独的内存空间,所以不同进程之间要通过通信机制(IPC)实现数据共享,常见的通信机制包括:管道信号套接字共享内存区等。
  • 线程:是cpu调度的执行单元,线程是定义在同一个进程下的,共享相同的上下文,所以相对于进程而言,线程之间的信息共享和通信更容易。线程共享cpu的执行时间

下图给出了进程和线程的资源对比

进程

线程
多个线程只是具有独立的栈空间,其他数据是共享的。

下面给出一个类比例子

进程的创建
Python的os模块中有fork(),通过fork可以在Linux\Unix\Mac系统下创建新的进程。但是Windows下并不支持这种方法。

multiprocessing模块跨平台的多进程模块

  1. 在介绍创建进程的方法之前,先来看一下Process类

  2. 创建进程方法1:以指定的函数作为target,创建Process对象来创建新的进程。

代码 如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
from multiprocessing import Process
import time,os


def fun(name):
print("Run child process is %s(%s)" % (name, os.getpid()))
print("hello", name)


if __name__ == "__main__":
print("Parent Process %s" % os.getpid())
p = Process(target=fun, args=("xyq",))
print("Child process will strat.")
p.start()
p.join()
print("Parent finish!")

结果

:此模块不支持交互式平台,所以可以在命令行或者pycharm执行。


再看一个例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import multiprocessing
from multiprocessing import Process
import time,os


def fun(name):
print("Run child process is %s(%s)" % (name, os.getpid()))


if __name__ == "__main__":
print("Parent Process %s" % os.getpid())
for i in range(8):
p = Process(target=fun, args=(i+1,))
p.start()
for p in multiprocessing.active_children():
print("child process name is :"+p.name+"chile ID is :"+str(p.pid))
print("waiting all Child process finish")

for p in multiprocessing.active_children():
p.join()

结果

看见子进程创建与调度与其创建顺序无关。

:使用循环创建多个进程。使用active_children()方法获取所有的子进程。使用p.name获取进程名,使用p.pid获取进程id.

  1. 继承Process类创建一个进程

代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import multiprocessing
from multiprocessing import Process
import time,os


class My_process(Process):
def __init__(self,name):
super().__init__()
self.name = name

def run(self):
print("run child process is %s(%s)"%(self.name, os.getpid()))


if __name__ == "__main__":
print("Parent Process %s" % os.getpid())

p = My_process("xy1")
p.start()
p.join()

结果:

注意
1、使用继承方法是构造方法使用:super().__init__
2、要重写run方法。

进程池

pool类:进程池,可以批量创建和管理子进程。

代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import multiprocessing
from multiprocessing import Pool
import time,os,random


def fun(name):
print("Run child process is %s(%s)" % (name, os.getpid()))
start = time.time()
time.sleep(random.random()*3)
end = time.time()
print("child process %s runs %.2f seconds"%(name, (end-start)))

if __name__ == "__main__":
print("Parent Process %s" % os.getpid())
p = Pool(3)
for i in range(3):
# 异步非阻塞式,根据系统调度进行进程的切换
p.apply_async(fun, args=(i, ))

# 在调用join时需要先关闭进程池
p.close()

# 等待全部子进程执行完毕后
p.join()

结果:

如果使用p.apply采取阻塞方法,会使进程依次执行,看结果:


也可以使用with方法进行进程池的创建:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import multiprocessing
from multiprocessing import Pool
import time,os,random


def fun(name):
myname = 0
for i in range(name):
print("(%s)process is running:%d"%(os.getpid(), i))
myname+=1
return myname

if __name__ == "__main__":
print("Parent Process %s" % os.getpid())
with Pool(3) as p:
result = p.map(fun, (3, 6, 9))
for item in result:
print(item)

注意:使用p.map()方法与前差不多,都是将参数依次传入函数。这里是创建了3个进程。

concurrent.futures模块下的ProcessPoolExecutor类

实现多进程并发编程:提高运行效率

一个例子:大素数求解

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import concurrent.futures
import math
from time import time

def is_prime(n):
if n<2:
return False
if n==2:
return True
if n%2 == 0:
return False
sqrt_n = int(math.floor(math.sqrt(n)))
for i in range(3, sqrt_n+1, 2):
if n%i ==0:
return False
return True

PRIMES = [
112272535095293,
112582705942171,
1099726899285419
]

def main():
with concurrent.futures.ProcessPoolExecutor() as pool:
# 提交给pool的是一个函数,如果是map,提交方法为pool.map
for number, prime in zip(PRIMES, pool.map(is_prime, PRIMES)):
print("%d is prime?%s"%(number, prime))

if __name__ == "__main__":
start = time()
main()
end = time()
print(end-start)

看结果:

不同的pool参数也会对运行效率有影响。

不一定是参数越大效率越高,因为会花费时间在进程切换上。一般参数= 任务个数最好

进程通信

多个进程之间进行通信,数据的共享

一个例子理解多进程的空间:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
from multiprocessing import Process
import time

counter = 0
def sub_task(string):
global counter
while counter<10:
print(string, end=' ')
counter+=1
time.sleep(0.1)

def main():
Process(target=sub_task, args=("ping", )).start()
Process(target=sub_task, args=("pong", )).start()

if __name__ == "__main__":
main()

结果:

解释:
在创建进程时每一个子进程都会复制父进程的数据,都会有一个单独的进程空间。所以counter是各个进程都有的,单独存在。

使用Queue类来进行进程通信

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import os
from multiprocessing import Process,Queue
import time

counter = 0
def fun(q):
print("(%s)进程开始从队列传输数据:"%os.getpid())
q.put("hello")

if __name__ == "__main__":
q = Queue()

p = Process(target=fun, args=(q, ))
p.start()
print("(%s)进程开始从队列读取数据:"%os.getpid())
print(q.get())
p.join()

看结果:

两个子进程之间的通信

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import os
from multiprocessing import Process,Queue
import time

counter = 0
def write(q):
print("(%s)进程开始从队列传输数据:"%os.getpid())
for i in ['a','b','c']:
q.put(i)
time.sleep(0.2)
def read(q):
print("(%s)进程开始从队列读取数据:"%os.getpid())
while True:
i = q.get(True)
print("Get %s from queue"%i)



if __name__ == "__main__":
q = Queue()

p1 = Process(target=write, args=(q, ))
p2 = Process(target=read, args=(q, ))
p1.start()
p2.start()


p1.join()
p2.join()
# 死循环,强制终止
p2.terminate()

结果:

多线程

思维导图如下:

threading模块

模块介绍
_Thread模块:低级模块
threading模块:高级模块,对_Thread模块进行了封装。

模块基本功能(部分)

  • threading.stack_size():返回当前线程大小
  • threading.stack_size(xx):设置当前线程大小
  • threading.active_count():查看存活线程数目
  • threading.enumerate():枚举当前活动的线程,以列表形式返回
  • threading.current_thread().name:返回当前线程的名称

示例

1
2
3
4
5
6
7
8
import threading
print(threading.stack_size())
threading.stack_size(64*1024)
print(threading.stack_size())

print(threading.active_count())
print(threading.enumerate())
print(threading.current_thread().name)

结果

创建线程的方法

  1. 使用threading模块的Threat类的构造方法创建线程,把一个函数传给target创建的Thread示例,再调用start方法启动线程
  2. 通过继承threading的Thread类创建进程

第一种方法:threading.Thread类创建

先来看一下Thread类的构造方法

中文

代码实践

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import threading
import time


def fun(num):
for i in range(num):
print("Thread %s >>> %s"%(threading.current_thread().name, i))
time.sleep(2)
print("Thread %s ended"%threading.current_thread().name)

if __name__ == "__main__":
print("Thread %s is runing..."%threading.current_thread().name)
t = threading.Thread(target=fun, args=(5,), name="Thread_t")
t.start()
t.join()
print("Thread %s ended"%threading.current_thread().name)

运行截图

注意事项
1、在创建进程时传入参数args需要使用元组,如果是单子,要用(xxx,)形式
2、区分threading方法和Thread方法。


join方法解析
t.join()就是截断当前进程,当子线程运行完毕后才继续运行。

代码示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import threading
import time


def fun(num):
for i in range(num):
print("Thread %s >>> %s"%(threading.current_thread().name, i))
time.sleep(0.5)
print("Thread %s ended"%threading.current_thread().name)

if __name__ == "__main__":
print("Thread %s is runing..."%threading.current_thread().name)
t1 = threading.Thread(target=fun, args=(5,), name="Thread_t1")
t1.start()
t1.join()
t2 = threading.Thread(target=fun, args=(5,), name="Thread_t2")
t2.start()
t2.join()
print("Thread %s ended"%threading.current_thread().name)

运行截图

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import threading
import time


def fun(num):
for i in range(num):
print("Thread %s >>> %s"%(threading.current_thread().name, i))
time.sleep(0.5)
print("Thread %s ended"%threading.current_thread().name)

if __name__ == "__main__":
print("Thread %s is runing..."%threading.current_thread().name)
t1 = threading.Thread(target=fun, args=(5,), name="Thread_t1")
t1.start()
t1.join()
t2 = threading.Thread(target=fun, args=(5,), name="Thread_t2")
t2.start()
# t2.join()
print("Thread %s ended"%threading.current_thread().name)

运行截图

解释
1、使用t.join()会其下的代码停止执行,对应线程执行完毕后再继续执行,见eg1。
2、如果不使用join方法就会使得多个线程共同执行(虽然还是一个时间执行一个,但是会在不同线程之间进行切换)见eg2。

第二种方法:继承threading.Thread类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import threading
import time

class myThread(threading.Thread):
def __init__(self, num, threadname):
super().__init__(name = threadname)
self.__num = num
def run(self):
time.sleep(0.5)
print(self.__num)

t = myThread(6, "myThread")
t.daemon = True
t.start()
t.join()
print("thread %s ended"%threading.current_thread().name)

运行结果

注意事项
1、继承方法创建进行本质上与第一种方法一致,不过可以在构造方法中设置需要的变量
2、在定义构造方法时先使用父类构造方法,传入进程名。
3、重写子类的run方法实现功能的自定义。(注:第一种方法也是通过run方法执行传入的函数的

锁LOCK

定义引入

临界资源:因为多个线程共享资源,如果此资源被多个进程竞争使用,即为“临界资源

锁的存在就是为临界资源添加一个保护,防止资源混乱。

一个经典例子:银行取钱

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import threading
import time

balance = 100000000
def draw(draw_account):
global balance
if balance >= draw_account:
print(threading.current_thread().name +"取前成功,取出的钱数为:"+ str(draw_account))
time.sleep(0.001)
balance-=draw_account
print("余额:", balance)
else:
print(threading.current_thread().name +"取钱时余额不足")

t1 = threading.Thread(target=draw, name="甲", args=(60000000,))
t2 = threading.Thread(target=draw, name="乙", args=(60000000,))
t1.start()
t2.start()
t1.join()
t2.join()

看结果

可见,多个线程竞争balance这个临界资源,造成逻辑错误。其错误来源:甲取钱后输出信息,而后在balance数值未变时乙开始取钱。


改进方法:锁

方法介绍

threading模块提供了Lock\RLock两个类,他们都提供了以下方法加锁和释放锁。

  • x.acquire(locking=True,timeout=-1)方法:获取锁,timeout设置加锁多少秒
  • x.release()方法:释放锁

代码示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26

import threading
import time

balance = 100000000
lock = threading.Lock() # 创建一个锁
def draw(draw_account):
global balance
lock.acquire()
try:
if balance >= draw_account:
print(threading.current_thread().name +"取前成功,取出的钱数为:"+ str(draw_account))
time.sleep(0.001)
balance-=draw_account
print("余额:", balance)
else:
print(threading.current_thread().name +"取钱时余额不足")
finally:
lock.release()

t1 = threading.Thread(target=draw, name="甲", args=(60000000,))
t2 = threading.Thread(target=draw, name="乙", args=(60000000,))
t1.start()
t2.start()
t1.join()
t2.join()

看结果:

注意点
1、在使用前要先创建一个锁,使用Lock类
2、使用try...finally...进行锁的释放
3、锁会把临界资源空间锁定,释放后别的线程才可以进行访问。?????????????

死锁

锁有优点,可以保证临界资源的安全性,但是也会造成死锁。

代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import threading
import time

class Account():
def __init__(self, _id, balance, lock):
self.id = _id
self.balance = balance
self.lock = lock

def withdraw(self, mny):
self.balance -= mny

def deposit(self, mny):
self.balance += mny

def Trans(_from, to, mny):
if _from.lock.acquire():
_from.withdraw(mny)
time.sleep(1)
print("wait lock...")

if to.lock.acquire():
to.deposit(mny)
to.release()
_from.release()
print("finish")
a = Account("甲", 1000,threading.Lock())
b = Account("乙", 1000,threading.Lock())
threading.Thread(target=Trans, args=(a, b, 300)).start()
threading.Thread(target=Trans, args=(b, a, 300)).start()

看结果

可以在acquire方法时设置时间,超出某个规定时间后就会释放锁。

线程通信

思维导图如下:

Condition实现线程通信

作用流程:实现变量同步

  • 创建condition对象
  • 通过acquire获取锁,然后判断一些条件
    • 若条件满足,处理后通过notify方法通知唤醒其他线程,其他处于wait的线程会重新判断条件
    • 不满足条件,wait阻塞该线程。
  • 最后使用release释放锁

一个例子:生产者与消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
import threading,time

class Producer(threading.Thread):
def __init__(self, threadname):
threading.Thread.__init__(self, name=threadname)
def run(self):
global count
while True:
with con:
if count > 100 :
con.wait()
else:
count += 50
print(self.name+'produce 50,count='+str(count))
con.notify()

class Consumer(threading.Thread):
def __init__(self, threadname):
threading.Thread.__init__(self, name=threadname)

def run(self):
global count
while True:
with con:
if count < 100:
con.wait()
else:
count -= 25
print(self.name + 'consumer 25,count=' + str(count))
con.notify()


count=200
con = threading.Condition()# 创建condition对象
def test():
for i in range(2):
p = Producer("producer")
p.start()
for i in range(3):
c = Consumer("consumer")
c.start()

if __name__ == "__main__":
test()

结果

具体分析:
Condition实现线程通信,其原理与锁类似,不过在具体实现生有所不同。在condition中加入了判别机制,当满足某些条件时获取锁,其余线程wait。并在每一次完成后唤醒其他线程。

具体流程图

queue实现线程通信

模块整体介绍:

queue模块下有三种队列类,分别如下:

  • queue.Queue:先进先出FIFO的常规队列、
  • queue.LifoQueue:后进先出的队列
  • queue.PriorityQueue:优先级队列,优先级最小的先出队

三种队列类提供的方法基本相同,下面以queue.Queue类为例:

  • qsize(self):队列的实际大小
  • empty(self):判定队列空,空返回True,否则返回False
  • full(self):判定队列满,满返回True,否则返回False
  • put(self, item, block=True, timeout=None):往队列中放入元素。如果队列满,block=True (阻塞方式) ,则当前线程被阻塞;如果队列满,block=False (非阻塞方式) ,则抛出异常queue.FULL
  • put nowait(self, item):往队列放入元素,采用非阻塞方式
  • get(self, block=True, timeout=None) :往队列中取出元素。如果队列空,block=True (阻塞方式) ,则当前线程被阻塞;如果队列空,block=False (非阻塞方式) ,则抛出异常queue.EMPTY
  • get nowait(self):往队列取出元素,采用非阻塞方式,
  • task_ done(self):前面的任务已经完成,用在队列的消费者线程中。get方法之后调用task_done方法告诉队列处理的任务完成了
  • join(self):队列阻塞,直到队列中所有的元素都被处理完毕。

一个例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import threading,time
import queue
class Producer(threading.Thread):
def __init__(self, threadname):
threading.Thread.__init__(self, name=threadname)

def run(self):
for i in range(5):
time.sleep(2)
myQueue.put(i)
print(self.name, " put ", i, 'to queue')
myQueue.put(None)

class Consumer(threading.Thread):
def __init__(self, threadname):
threading.Thread.__init__(self, name=threadname)

def run(self):
while True:
time.sleep(3)
item = myQueue.get()
if item == None:
break
print(self.name, " get ", item, ' from queue')


myQueue = queue.Queue()

p = Producer("producer")
c = Consumer("consumer")
c.start()
p.start()
c.join()
p.join()

结果:

具体介绍:
1、queue就是生成一个共享的队列,各个线程可以向这个队列里取元素也可以放元素。
2、可以在线程中使用while True死循环保证线程的执行,而后通过if ... break的方法结束线程

join方法的具体介绍

整体流程:

Event实现线程通信

模块介绍

一个例子:模仿多线程连接服务器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import threading
import time
import logging

logging.basicConfig(level=logging.DEBUG, format='(%(threadName)-10s) %(message)s',)
def worker(event):
while not event.is_set():
logging.debug('Waiting for redis ready...')
event.wait()
logging.debug('redis ready, and connect to redis server and do some work [%s]', time.ctime( ))
time. sleep(1)

readis_ready = threading.Event()
for i in range(3):
t = threading.Thread( target=worker, args=(readis_ready,))
t.start()
logging.debug('first of all, check redis server, make sure it is 0K')
time. sleep(3)
readis_ready.set()

结果:

具体介绍
1、使用readis_ready = threading.Event()创建一个event.
2、在创建进程时将event作为参数传入
3、使用 event.wait()阻塞进程。readis_ready.set()解除阻塞。
4、此方法无锁。

最后补充:


线程私有变量

思维导图:

定义介绍:

在多线程环境下,如果多个线程对全局变量进行了修改,将会影响到其他所有的线程。为了避免多个线程同时对变量进行修改,引入了线程同步机制,通过互斥锁、条件变量等来控制对全局变量的访问。

但是在很多时候线程也需要拥有自己的私有数据,可以使用局部变量方式,同时python还提供了ThreadLocal变量,它本身是一个全局变量, 但是每个线程却可以利用它来保存属于自己的私有数据,这些私有数据对其他线程也是不可见的.

具体方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import threading , time
#创建全局ThreadLocal对象:
local_data = threading.local()
def fun(n):
for i in range(n):
try:
local_data.num += i
except:
local_data.num = i
time. sleep(1)
print('\n%s local_data is : %s' % (threading.current_thread().name, local_data.num) )
threads=[]
t1 = threading.Thread(target=fun, name='Thread1', args=(5, ))
threads.append(t1)
t2 = threading.Thread(target=fun, name='Thread2', args=(5,))
threads.append(t2)
for t in threads:
t.start()
t.join()

结果

合理使用threading.local可以极大的简化代码逻辑,保证各个子线程的数据安全。

ThreadPoolExecutor并发编程

思维导图:

定义引入:

由于启动新线程时,涉及到和操作系统的交互,因此启动新线程的成本比较高,为了提高性能,可以使用线程池来管理线程。
线程池在启动时创建大量的空闲线程,我们的程序只要将一个函数提交线程池, 线程池就会启动个空闲的线程来执行它。当函数结束后,这个线程并不会死亡,而是回到线程池中成为空闲状态,等待执行下一个函数。通过线程池可以控制系统中并发线程的数量。

一般步骤:
1、调用ThreadPoolExecutor类的构造函数创建一个线程池
2、定义普通函数作为线程任务
3、调用ThreadPoolExecutor 对象的submit()方法提交线程池任务
4、通过shutdown方法关闭线程池

具体使用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import threading,time
from concurrent.futures import ThreadPoolExecutor
#创建全局ThreadLocal对象:
local_data = threading.local()
def fun(n):
for i in range(n):
try:
local_data.num+= i
except:
local_data.num = i
print('\n%s local_data is : %s' % (threading.current_thread().name, local_data.num))
time.sleep(1)
pool = ThreadPoolExecutor(max_workers=2)
task1 = pool.submit(fun, 5) #提交1个任务
task2 = pool.submit(fun, 10) #提交1个任务
time. sleep(10)
print("task1:", task1.done())
print("task2:", task2.done())
pool.shutdown()

结果:

注意点:
1、不需要再定义线程,而是通过线程池创建多个线程的集合。
2、通过submit向线程池里传递任务。

使用with和map进行扩展:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import threading,time
from concurrent.futures import ThreadPoolExecutor
#创建全局ThreadLocal对象:
local_data = threading.local()
def fun(n):
for i in range(n):
try:
local_data.num+= i
except:
local_data.num = i
print('\n%s local_data is : %s' % (threading.current_thread().name, local_data.num))
time.sleep(1)
with ThreadPoolExecutor(max_workers=2) as pool:
task = pool.map(fun, (5,10)) #提交1个任务

time. sleep(10)
for i in task:
print(i)

1、with可以用于管理池,不需要显式关闭
2、map同前。

扩展积累

实训作业

素数个数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
import math
from multiprocessing import cpu_count
from multiprocessing import Pool


def isPrime(n):
# 判断数字是否为素数
# 请在此处添加代码 #
# *************begin************#
if n <= 1:
return False
for i in range(2, int(math.sqrt(n)) + 1):
if n % i == 0:
return False
return True

# **************end*************#

def howMany(T):
# 计算给定区间含有多少个素数
# 请在此处添加代码 #
# *************begin************#
sum = 0;
for i in range(T[0], T[1] + 1):
if isPrime(i):
sum += 1
return sum
# **************end*************#

def separateNum(N, CPU_COUNT):
# 对整个数字空间N进行分段CPU_COUNT
# 请在此处添加代码 #
# *************begin************#
list = [[i * (N // CPU_COUNT) + 1, (i + 1) * (N // CPU_COUNT)] for i in range(0, CPU_COUNT)]
list[0][0] = 1
if list[CPU_COUNT - 1][1] < N:
list[CPU_COUNT - 1][1] = N
return list

# **************end*************#

if __name__ == '__main__':
N = int(input())
# 多进程
CPU_COUNT = cpu_count() ##CPU内核数 本机为8
pool = Pool(CPU_COUNT)
sepList = separateNum(N, CPU_COUNT)
result = []
for i in range(CPU_COUNT):
result.append(pool.apply_async(howMany, (sepList[i], )))
pool.close()
pool.join()
# ans = 0
list = [res.get() for res in result]
print(sum(list), end = '')

注意点
1、判断素数的函数。
2、分区结果为区间的首尾,而不是全部。注意使用列表生成式。
3、进程池的使用,看函数与上介绍。(有返回值)

多线程:合数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
import threading
import math
ans = 0
lock = threading.Lock()


def isPrime(n):
# 判断数字是否为素数

global ans
if n <= 1:
return False
for i in range(2, int(math.sqrt(n)) + 1):
if n % i == 0:
return False
return True




def howMany(T):
# 计算给定区间含有多少个素数

sum = 0
for i in range(T[0], T[1] + 1):
if isPrime(i):
sum += 1
lock.acquire()
try:
global ans
ans += sum
finally:
lock.release()

def seprateNum(N, CPU_COUNT):
# 对整个数字空间N进行分段CPU_COUNT
list = [[i * (N // CPU_COUNT) + 1, (i + 1) * (N // CPU_COUNT)] for i in range(0, CPU_COUNT)]
list[0][0] = 1
if list[CPU_COUNT - 1][1] < N:
list[CPU_COUNT - 1][1] = N
return list

if __name__ == '__main__':
N = int(input())
threadNum = 32
# 请在此处添加代码 #
# *************begin************#
result = []
seqList = seprateNum(N, threadNum)
for i in range(0,threadNum):
result.append(threading.Thread(target=howMany,args=(seqList[i],)))
result[i].start()
for i in range(0,threadNum):
result[i].join()
print(N-1-ans)

# **************end*************#

注意点:
1、多线程的使用。(未使用线程池,而是多个线程,启动+join)
2、锁的使用,在更新ans时需要使用锁。

锁的使用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
import threading
import sys
import time


def showfoo(n):
'''

:param n: 要输出foobarpython的次数
:return: 无返回,可直接输出
'''
# 请在此处添加代码 #
# *************begin************#

for i in range(n):
lockpython.acquire()
print('foo', end='')
lockfoo.release()
time.sleep(0.2)


# **************end*************#


def showbar(n):
'''

:param n: 要输出foobarpython的次数
:return: 无返回,可直接输出
'''
# 请在此处添加代码 #
# *************begin************#
for i in range(n):
lockfoo.acquire()
print('bar', end='')
lockbar.release()
time.sleep(0.2)


# **************end*************#

def showpython(n):
'''

:param n: 要输出foobarpython的次数
:return: 无返回,可直接输出
'''
# 请在此处添加代码 #
# *************begin************#
for i in range(n):
lockbar.acquire()
print('python', end='')
lockpython.release()
time.sleep(0.2)


# **************end*************#


if __name__ == '__main__':
lockfoo = threading.Lock() # 定义3个互斥锁
lockbar = threading.Lock()
lockpython = threading.Lock()
n = int(input())
t1 = threading.Thread(target=showfoo, args=[n]) # 定义3个线程
t2 = threading.Thread(target=showbar, args=[n])
t3 = threading.Thread(target=showpython, args=[n])
lockfoo.acquire() # 先锁住foo,bar锁,保证先打印foo
lockbar.acquire()

t1.start()
t2.start()
t3.start()

注意点:
1、锁会锁住临界资源,进程要使用这些资源必须先获取锁。
2、也可以使用锁来控制子线程的运行顺序。即在执行某代码之前,加上获取某个锁的条件,这样只有当这个锁解开之后才可以执行代码。
3、在主函数需要先获取foo、bar的锁,因为我们需要先输出foo,而要执行打印foo代码需要先获取python的锁。

单元测试

  1. 基本单位


  1. 误解进程

D :也可能是准备状态
C :在单处理器系统中,如果发生死锁,进程就全部处于阻塞态;系统中没有进程时,也没有进程处于运行态。


  1. 线程通信

D为进程通信


  1. 易错点


  1. GIL锁

https://blog.csdn.net/qq_45632139/article/details/106902316


  1. 死锁解法


  1. 区分进程和线程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
from multiprocessing import Process  
from time import sleep
counter = 0
def sub_task(string) :
global counter
while counter < 5:
print(string, end='', flush=True)
counter += 1
sleep(0.01)
def main():
Process(target=sub_task, args=('Ping',)).start()
Process(target=sub_task, args=('Pong',)).start()
if __name__ == '__main__':
main()

上代码是进程,各进程counter不会相互影响,只是顺序会有所不同,总数一致。
下代码是线程,会共享counter。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import threading
from time import sleep
counter = 0
def sub_task(string) :
global counter
while counter < 5:
print(string, end='', flush=True)
counter += 1
sleep(0.1)
def main():
p1 = threading.Thread(target=sub_task, args=('Ping',)).start()
p2 = threading.Thread(target=sub_task, args=('Pong',)).start()
if __name__ == '__main__':
main()