第十四章:深入解析并发编程 14. 并发编程基本概念 并发编程是指程序设计中允许多个任务同时执行的编程模式,它的核心目标是 提升执行效率 。通过并发编程,原本需要 20 分钟执行的代码可能只需要 1 分钟就能完成。
进程调度机制解析 CPU 在执行程序时会涉及进程调度,主要有两种切换情况:
I/O 操作触发切换 :当程序遇到 I/O 操作时,操作系统会剥夺该程序对 CPU 的执行权限时间片用尽触发切换 :当一个程序长时间占用 CPU 时,操作系统也会剥夺程序对 CPU 的执行权限所谓 I/O 操作,指的为 阻断
程序的操作,类似于 input()
函数会将程序暂停运行,达到某一个条件后才会接触阻塞状态
时间片即 CPU 分配给各个程序的时间,每个线程被分配一个时间段,称作它的时间片,即该进程允许运行的时间,使各个程序从表面上看是同时进行的。如果在时间片结束时进程还在运行,则 CPU 将被剥夺并分配给另一个进程。如果进程在时间片结束前阻塞或结束,则 CPU 当即进行切换。而不会造成 CPU 资源浪费。
在宏观上:我们可以同时打开多个应用程序,每个程序并行不悖,同时运行。
但在微观上:由于只有一个 CPU,一次只能处理程序要求的一部分,如何处理公平,一种方法就是引入时间片,每个程序轮流执行。
进程的三大状态与生命周期 进程在其生命周期中会经历三种基本状态:
首先一个程序想要被运行,当用户双击图标后,此时程序就会从硬盘加载到内存,所有的程序想要被执行就必须经历就绪态,然后等待 CPU 执行,就绪态之后会进入进程调度,然后运行
运行时会出现以下几种情况:
1.时间片运行完毕,程序也执行完毕,释放资源后退出 2.程序运行过程遇到 I/O 操作(读写、发送网络请求)它是不需要 CPU 工作的,只要运行遇到了 I/O,操作系统就会把 CPU 拿走,执行其他的时间片,程序就会进入阻塞态,当 IO 请求完成后它就会结束阻塞态,回到就绪态里排队 14.1 同步与异步编程模型 同步和异步 同步:任务提交之后,原地等待任务的返回结果,等待的过程中不做任何事情
异步:任务提交之后,不再等待任务的返回结果,而是去做一些其他的事情
这两个概念主要 描述任务的提交方式 :
📝 实际应用 :在 Web 开发中,同步请求会阻塞页面渲染,而异步请求(AJAX)则可以在后台处理数据,不影响用户体验。
阻塞和非阻塞 这两个概念主要 描述进程的运行状态 :
阻塞 :对应进程的阻塞态非阻塞 :对应进程的就绪态、运行态结合同步/异步和阻塞/非阻塞,可以形成四种组合:
同步阻塞 同步非阻塞 异步阻塞 异步非阻塞 (CPU 利用率最高的一种模式)🔍 在实际开发中,异步非阻塞模式是高并发系统的首选模式,因为它允许程序在等待 I/O 操作时继续执行其他任务。
14.2 多进程编程技术 进程基础 进程 是程序在计算机中的一次执行过程:
程序 是静态的可执行文件,占用磁盘空间进程 是动态的执行过程,占用计算机运行资源类比:一个工厂有三个车间,每个车间一个工人(共 3 人),并行处理任务,相当于一个程序创建三个进程,每个进程一个线程(共 3 人),并行处理任务。
进程创建方法 1 2 3 4 5 6 7 8 9 10 from multiprocessing import ProcessProcess(target,name,args,kwargs) '''''' '''''' ''''' 功能 : 创建进程对象 参数 : target 绑定要执行的目标函数 name 进程名,默认是Process-x(整数) args 元组,用于给target函数位置传参 kwargs 字典,给target函数键值传参 ''' '''''' '''''' ''''
方法一:使用 Process 类创建进程 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 from multiprocessing import Processimport timeimport osdef worker_function (name, age ): """子进程执行过程中触发的函数""" print (f"子进程ID:{os.getpid()} ,父进程ID{os.getppid()} " ) print (f"子进程正在执行,参数name={name} ,age={age} " ) time.sleep(5 ) print (f"子进程{name} 执行完毕" ) def cpu_intensive_task (number ): """CPU密集型任务""" result = 0 for i in range (number): result += i * i print (f"进程{os.getpid()} 计算完成,结果为{result} " ) if __name__ == '__main__' : print (f"父进程ID:{os.getpid()} " ) start_time = time.time() processes = [] p1 = Process(target=worker_function, args=("张三" ,), kwargs={"age" : 20 }) p2 = Process(target=worker_function, args=("李四" ,), kwargs={"age" : 30 }) p3 = Process(target=cpu_intensive_task, args=(10000 ,)) p4 = Process(target=cpu_intensive_task, args=(20000 ,)) processes.extend([p1, p2, p3, p4]) for p in processes: p.start() for p in processes: p.join() end_time = time.time() print (f"所有进程执行完毕,总耗时{end_time - start_time:.2 f} 秒" ) print ("如果使用单进程顺序执行,耗时会更长,因为是两个任务在执行,多进程可以充分利用多核CPU并行处理任务" )
⚠️ 重要提示 :在 Windows 系统中,必须在 if __name__ == '__main__'
条件下创建进程,这是因为 Windows 使用 spawn 方式创建进程,会重新导入模块,可能导致递归创建进程。
方法二:继承 Process 类创建进程 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 from multiprocessing import Processimport timeimport osclass WorkerProcess (Process ): """继承Process类的自定义进程""" def __init__ (self, name, age=None ): super ().__init__() self .name = name self .age = age def run (self ): """重写run方法,进程启动后会执行该方法""" print (f"子进程ID:{os.getpid()} ,父进程ID{os.getppid()} " ) print (f"子进程正在执行,参数name={self.name} ,age={self.age} " ) time.sleep(5 ) print (f"子进程{self.name} 执行完毕" ) class CPUIntensiveProcess (Process ): """CPU密集型任务进程""" def __init__ (self, number ): super ().__init__() self .number = number def run (self ): """重写run方法,执行CPU密集型计算""" result = 0 for i in range (self .number): result += i * i print (f"进程{os.getpid()} 计算完成,结果为{result} " ) if __name__ == '__main__' : print (f"父进程ID:{os.getpid()} " ) start_time = time.time() processes = [] p1 = WorkerProcess("张三" , 20 ) p2 = WorkerProcess("李四" , 30 ) p3 = CPUIntensiveProcess(10000 ) p4 = CPUIntensiveProcess(20000 ) processes.extend([p1, p2, p3, p4]) for p in processes: p.start() for p in processes: p.join() end_time = time.time() print (f"所有进程执行完毕,总耗时{end_time - start_time:.2 f} 秒" )
多进程常用方法表 方法名 说明 实际应用场景 Process(target=...)
创建进程对象 指定新进程要执行的函数 start()
启动进程 开始执行进程的任务 join()
等待进程结束 协调多个进程的执行顺序 is_alive()
检查进程是否存活 监控进程状态 terminate()
强制终止进程 中断异常或超时的进程 Queue()
创建进程安全的队列 进程间数据传递 put(item)
添加元素到队列 向队列中放入数据 get()
从队列获取元素 从队列获取数据 Pipe()
创建管道对象 进程间双向通信
进程号与进程信息获取 在多进程编程中,获取进程信息对于调试和管理至关重要。Python 的 multiprocessing
模块提供了 current_process()
方法来获取当前进程的信息。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 import multiprocessingimport osdef process_info (): """打印当前进程的信息""" process = multiprocessing.current_process() print (f"进程名称: {process.name} " ) print (f"进程ID: {process.pid} " ) print (f"父进程ID: {os.getppid()} " ) print (f"进程授权键: {process.authkey} " ) print (f"进程是否活跃: {process.is_alive()} " ) if __name__ == '__main__' : p = multiprocessing.Process(target=process_info, name="自定义进程名" ) p.start() p.join()
进程间通信示例 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 from multiprocessing import Process,Queueimport timedef worker (q ): """子进程函数,想往父进程发送消息,就往这个队列里放""" print ("子进程启动" ) q.put("我是一个队列数据" ) time.sleep(2 ) print ("子进程结束" ) if __name__ == '__main__' : q = Queue() p = Process(target=worker, args=(q,)) p.start() print ("主进程等待子进程数据...." ) message = q.get() print (f"主进程收到来自于子进程的消息:{message} " ) p.join() print ("主进程结束" )
复杂进程通信示例 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 import osimport timefrom multiprocessing import Process, Queue, Lock, Value, Arraydef worker_with_args (q, lock, value, arr, sleep_num ): """带有共享资源的工作函数 Args: q: 队列 lock: 锁 value: 值 arr: 数组 sleep_num: 休眠时间 """ pid = os.getpid() print (f"进程{pid} 开始执行" ) with lock: value.value += 1 for i in range (len (arr)): arr[i] **= 2 print (f"进程{pid} 修改数组元素{i} 为{arr[i]} " ) q.put(f"这是一条来自于进程{pid} 的信息" ) time.sleep(sleep_num) print (f"进程{pid} 执行完毕" ) if __name__ == "__main__" : """ 解释输出逻辑: 1. 创建了4个进程,它们共享同一个数组arr,初始值为[1,2,3,4,5] 2. 每个进程获取锁后,会将数组中的每个元素进行平方操作(arr[i] **= 2) 3. 由于进程是按顺序启动的,但执行顺序不确定,所以: - 第一个获得锁的进程将[1,2,3,4,5]平方为[1,4,9,16,25] - 第二个获得锁的进程将[1,4,9,16,25]平方为[1,16,81,256,625] - 第三个获得锁的进程将[1,16,81,256,625]平方为[1,256,6561,65536,390625] - 第四个获得锁的进程将[1,256,6561,65536,390625]平方,但由于整数溢出, 导致最后两个元素变成了0和负数 4. 进程完成的顺序取决于sleep_num参数(1,2,3,4),所以最先完成的是第一个进程 """ q = Queue() lock = Lock() value = Value("i" , 0 ) arr = Array("i" , [1 , 2 , 3 , 4 , 5 ]) processes = [] for i in range (4 ): p = Process(target=worker_with_args, args=(q, lock, value, arr, i + 1 )) p.start() processes.extend([p])
进程池详解 进程池是一种管理多个进程的方式,可以简化并行计算的编程。Python 的 multiprocessing
模块中的 Pool
类和 concurrent.futures
模块的 ProcessPoolExecutor
类都提供了进程池功能。
进程池的主要方法 方法 描述 使用场景 Pool(processes=None)
创建进程池,进程数默认为 CPU 核数 初始化进程池 apply(func, args)
阻塞执行任务 需要顺序执行且等待结果的场景 apply_async(func, args)
非阻塞执行任务 需要异步执行的场景 map(func, iterable)
并行执行映射任务 对列表元素并行处理 close()
关闭进程池,不再接受新任务 完成任务提交后 terminate()
立即终止所有工作进程 需要强制停止时 join()
等待所有工作进程退出 在 close()后使用
ProcessPoolExecutor 示例 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 from concurrent.futures import ProcessPoolExecutorimport osdef worker_function (name,age ): """进程池工作函数""" pid = os.getpid() print (f"进程{pid} :{name} ,{age} 岁" ) return f"我的父进程是{os.getppid()} 我结束进程了" if __name__ == '__main__' : with ProcessPoolExecutor(max_workers=4 ) as executor: for i in range (10 ): future = executor.submit(worker_function, f"小明{i} " , i) result = future.result() print (result)
Pool 对象示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 from multiprocessing import Poolimport osdef worker_function (args ): """工作进程函数""" name,age = args pid = os.getpid() print (f"进程{pid} :{name} ,{age} 岁" ) return f"我的父进程是{os.getppid()} 我结束进程了" if __name__ == '__main__' : with Pool() as pool: args_list = [(f"张三{i} 号" , i+18 ) for i in range (1 ,10 )] result_list = pool.map (worker_function, args_list) for result in result_list: print (result)
进程号与进程信息获取 在多进程编程中,获取进程信息对于调试和管理至关重要。Python 的 multiprocessing
模块提供了 current_process()
方法来获取当前进程的信息。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 import multiprocessingimport osdef process_info (): """打印当前进程的信息""" process = multiprocessing.current_process() print (f"进程名称: {process.name} " ) print (f"进程ID: {process.pid} " ) print (f"父进程ID: {os.getppid()} " ) print (f"进程授权键: {process.authkey} " ) print (f"进程是否活跃: {process.is_alive()} " ) if __name__ == '__main__' : p = multiprocessing.Process(target=process_info, name="自定义进程名" ) p.start() p.join()
进程状态特殊情况 僵尸进程 1 2 3 4 5 6 7 8 9 10 ''' 子进程死后还会有一些资源占用(进程号,进程的运行状态,运行时间),等待父进程通过系统调用 进行资源回收 相当于子进程死了之后,需要父进程来给他"收尸" 除了init进程之外,所有的进程最后都会步入僵尸进程 在一种情况下是会带来危害的: 子进程退出之后,父进程没有及时处理,僵尸进程就会一直占用资源 如果产生了大量僵尸进程,资源过度使用,系统没有可用的进程号,导致系统不能产生新的进程 '''
注意:在 Windows 中,子进程退出后会立即被系统回收,不会产生真正的僵尸进程,在 Windows 系统中,不需要显式调用 wait 来回收子进程资源
孤儿进程 1 2 3 4 ''' 子进程处于存活状态,父进程意外死亡,操作系统就会开设一个孤儿院(init进程),用来管理 孤儿进程,回收孤儿进程相关资源 '''
📝 知识点 :操作系统会自动处理孤儿进程,将它们的父进程更改为 init 进程(PID 为 1),所以孤儿进程不会造成资源泄漏问题。
14.3 多线程编程深入解析 线程基础 线程 是轻量级的进程,也是多任务编程的一种方式:
一个进程中可以包含多个线程 线程也是一个运行行为,消耗计算机资源 一个进程中的所有线程共享这个进程的资源 线程的创建和销毁消耗资源远小于进程 一个工厂至少有一个车间,一个车间中至少有一个工人,工人去利用车间的设备工作;
一个程序至少有一个进程,一个进程中至少有一个线程,线程去利用进程的资源工作。
线程创建方法 方法一:使用 Thread 类创建线程 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 from threading import Threadimport timedef worker_function (name,delay ): print (f"线程{name} 开始工作" ) time.sleep(delay) print (f"线程{name} 结束工作" ) if __name__ == '__main__' : t1 = Thread(target=worker_function,args=("线程1" ,2 )) t2 = Thread(target=worker_function,args=("线程2" ,4 )) t1.start() t2.start() t1.join() t2.join() print ("主线程结束" )
方法二:继承 Thread 类创建线程 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 from threading import Threadimport timeclass MyThread (Thread ): def __init__ (self,name,message ): super ().__init__() self .name = name self .message = message def run (self ): print (f"线程{self.name} 开始执行" ) time.sleep(2 ) print (f"线程{self.name} 执行完毕,消息:{self.message} " ) if __name__ == '__main__' : t1 = MyThread(name="线程1" ,message="子进程操作完毕" ) t2 = MyThread(name="线程2" ,message="子进程操作完毕" ) t1.start() t2.start() t1.join() t2.join() print ("主进程执行完毕" )
线程常用方法表 方法名 说明 实际应用场景 start()
启动线程 开始执行线程任务 run()
定义线程执行的任务 重写该方法自定义线程行为 join()
等待线程结束 协调线程执行顺序 join(timeout)
等待线程结束,有超时时间 防止无限等待 is_alive()
检查线程是否活动 监控线程状态 getName()
获取线程名称 调试和日志记录 setName(name)
设置线程名称 便于识别不同线程 setDaemon(T/F)
设置为守护线程 随主线程结束而结束的后台任务 isDaemon()
检查是否为守护线程 确认线程类型 getId()
获取线程 ID 唯一标识线程 current_thread
获取当前线程对象 在函数中获取当前执行线程
线程使用实例 基本线程示例 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 from threading import Threadimport timeimport requestsdef download_file (url,session ): """访问网站下载文件""" with session.get(url) as response: print (f"读取{url} 长度为{len (response.content)} " ) def download_all_sites (urls ): """单线程下载所有网站""" with requests.Session() as session: for url in urls: download_file(url,session) time.sleep(1 ) def download_site_thread (url,session ): """多线程下载网站""" download_file(url,session) def download_all_sites_thread (urls ): """多线程下载所有网站""" threads = [] with requests.Session() as session: for url in urls: thread = Thread(target=download_site_thread, args=(url,session)) threads.append(thread) thread.start() for thread in threads: thread.join() if __name__ == '__main__' : sites = [ "https://www.baidu.com" , "https://www.sina.com.cn" , "https://www.qq.com" , "https://www.163.com" , "https://www.sohu.com" , ] print ("=======单线程下载开始========" ) start_time = time.time() download_all_sites(sites) end_time = time.time() print (f"单线程下载结束,耗时{end_time-start_time} 秒" ) print ("\n=======多线程下载开始========" ) start_time = time.time() download_all_sites_thread(sites) end_time = time.time() print (f"多线程下载结束,耗时{end_time-start_time} 秒" ) print ("\nIO密集型任务(如网络请求)适合使用多线程,可以显著提高性能" ) print ("这是因为当一个线程等待IO操作完成时,其他线程可以继续执行" )
守护线程示例 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 from threading import Threadimport timedef worker_function (thread_name ): print (f"线程 {thread_name} 启动" ) time.sleep(3 ) print (f"线程 {thread_name} 结束" ) if __name__ == '__main__' : t1 = Thread(target=worker_function, args=("Thread-1" ,)) t2 = Thread(target=worker_function, args=("Thread-2" ,)) t3 = Thread(target=worker_function, args=("Thread-3" ,), daemon=True ) t1.start() t2.start() t3.start() print (f"等待线程{t1.name} + {t2.name} 完成..." ) t1.join() t2.join() print (f"线程{t1.name} 是否存活:{t1.is_alive()} " ) print (f"线程{t2.name} 是否存活:{t2.is_alive()} " ) print (f"线程{t3.name} 是否存活:{t3.is_alive()} " ) time.sleep(10 ) print ("主线程结束" )
💡 守护线程特性 :守护线程会随着主线程的结束而结束,不管它是否执行完成。适用于需要在后台运行但不要求必须完成的任务,如监控、日志记录等。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 from threading import Threadimport timeimport loggingimport psutillogging.basicConfig( level=logging.INFO, format ='%(asctime)s %(levelname)s %(message)s' , ) def system_monitor (interval=1 ): """ 守护线程:系统资源监控器 持续监控CPU使用率和内存使用情况,并记录到日志中 """ logging.info('系统监控守护线程启动' ) try : while True : cpu_usage = psutil.cpu_percent(interval=interval) mem_usage = psutil.virtual_memory().percent logging.info(f'CPU使用率:{cpu_usage} % 内存使用率:{mem_usage} %' ) time.sleep(interval) if cpu_usage > 80 or mem_usage > 80 : logging.warning('系统资源占用过高,请及时处理' ) except Exception as e: logging.error(f'系统监控线程异常:{e} ' ) finally : logging.info('系统监控守护线程结束' ) if __name__ == '__main__' : monitor_thread = Thread(target=system_monitor,args=(1 ,),daemon=True ,name="MonitorThread" ) monitor_thread.start() logging.info("主线程运行中,监控守护线程在后台运行..." ) time.sleep(30 ) logging.info(f"监控守护线程是否存活: {monitor_thread.is_alive()} " ) logging.info("主线程结束,守护线程将自动终止" )
线程池详解 线程池是一种管理线程资源的方式,它预先创建一定数量的线程,然后复用这些线程来执行任务,避免了频繁创建和销毁线程的开销。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 from concurrent.futures import ThreadPoolExecutorimport timedef time_consuming_task (n ): time.sleep(1 ) return n * n if __name__ == '__main__' : start_time = time.time() with ThreadPoolExecutor(max_workers=4 ) as executor: futures = [executor.submit(time_consuming_task, i) for i in range (1 , 21 )] print ("任务已提交,主线程继续执行........." ) results = [future.result() for future in futures] print (f"任务执行完毕,结果为:{results} " ) end_time = time.time() print (f"总共耗时:{end_time - start_time} 秒" )
ThreadPoolExecutor 主要方法 方法名 简洁解释 适用场景 submit(fn, *args)
异步执行函数,返回 Future 对象 单独提交任务并获取结果 map(func, *iterables)
对每个输入并行执行函数 批量处理类似任务 shutdown(wait=True)
关闭执行器 资源释放 result()
获取任务执行结果 获取异步任务的返回值 add_done_callback(fn)
添加任务完成回调函数 任务完成后的后续处理 as_completed()
返回已完成任务的迭代器 先处理先完成的任务 wait()
等待任务完成 任务同步点
🔍 深入理解 :线程池最大的好处是控制并发数量,防止系统资源被耗尽。在实际开发中,建议将线程数设置为 CPU 核心数的 1-5 倍,具体取决于任务是 I/O 密集型还是 CPU 密集型。
完整示例 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 from concurrent.futures import ThreadPoolExecutor,as_completed,waitimport timedef time_consuming_task (n ): """耗时任务""" time.sleep(n % 3 +1 ) return n * n def task_done_callback (future ): """任务完成后触发的回调函数""" print (f"任务完成,结果为{future.result()} " ) if __name__ == '__main__' : print ("===== 1. submit方法示例 =====" ) start_time = time.time() with ThreadPoolExecutor(max_workers=3 ) as executor: future_list = [executor.submit(time_consuming_task, i) for i in range (10 )] print ("提交任务完成,等待结果..." ) for future in as_completed(future_list): print (f"任务{future.result()} 完成" ) end_time = time.time() print (f"总耗时:{end_time - start_time} 秒" ) print ("\n===== 2. map方法示例 =====" ) start_time = time.time() with ThreadPoolExecutor(max_workers=4 ) as executor: results = executor.map (time_consuming_task, range (10 )) print ("任务提交完成,直接获取有序结果..." ) results_list = list (results) print (f"结果:{results_list} " ) end_time = time.time() print (f"总耗时:{end_time - start_time} 秒" ) print ("\n===== 3. add_done_callback示例 =====" ) start_time = time.time() with ThreadPoolExecutor(max_workers=4 ) as executor: futures = [] for i in range (1 , 6 ): future = executor.submit(time_consuming_task, i) future.add_done_callback(task_done_callback) futures.append(future) print ("已添加回调函数,主线程继续执行..." ) for future in futures: future.result() print (f"耗时:{time.time() - start_time} 秒" ) print ("\n===== 4. wait示例 =====" ) start_time = time.time() with ThreadPoolExecutor(max_workers=4 ) as executor: futures = [executor.submit(time_consuming_task, i) for i in range (1 , 11 )] print ("任务已提交,等待所有任务完成..." ) done, not_done = wait(futures) print (f"完成的任务数: {len (done)} , 未完成的任务数: {len (not_done)} " ) results = [future.result() for future in done] print (f"所有任务执行完毕,结果为:{results} " ) print (f"耗时:{time.time() - start_time} 秒" )
Event 事件同步机制 Event 是一种线程同步机制,用于协调多个线程的执行顺序。它本质上是一个内部的标志位,线程可以等待这个标志位被设置,也可以设置或清除这个标志位。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 from threading import Thread, Eventimport timeevent = Event() def bus_stop (): """模拟公交车到站过程""" print ('公交车即将到站' ) time.sleep(2 ) print ('公交车已到站<====>' *10 ) event.set () def passenger (name ): """模拟乘客等车 Args: name: 乘客名称 """ print (name, '等车中' ) event.wait() print (name, '出发!!!!!!!!!!!!!!!!!!!!!!!!!!' ) if __name__ == '__main__' : t1 = Thread(target=bus_stop) t1.start() for i in range (20 ): t = Thread(target=passenger, args=(f'乘客{i} ' ,)) t.start() time.sleep(0.1 )
Event 主要方法 方法名 描述 使用场景 set()
设置事件标志为 True 通知等待的线程继续执行 clear()
清除事件标志为 False 重置事件状态,使线程再次等待 is_set()
检查事件状态 判断事件是否已被设置 wait()
等待事件被设置 阻塞线程直到事件被设置或超时
🌟 应用场景 :Event 适合实现一次性通知多个线程的场景,比如多个工作线程等待初始化完成、多个消费者等待数据准备就绪等。在 Web 开发中,可用于协调多个后台任务的启动时机。
定时器(Timer) 定时器是线程的一个特殊应用,用于在指定时间后执行某个操作。Python 的 threading
模块提供了 Timer
类来实现这一功能。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 from threading import Timerdef delayed_greeting (name ): """延迟执行的问候函数 Args: name: 要问候的对象名称 """ print (f"{name} 说: 哈哈,我是延迟1秒后才执行的!" ) timer = Timer(1 , delayed_greeting, args=("小明" ,)) timer.start() print ("定时器已启动,但greeting函数还未执行..." )
💡 实用技巧 :Timer 可用于实现超时处理、延迟重试、定时清理等场景。例如,在网络编程中,可以用 Timer 设置请求超时机制;在数据同步中,可以用 Timer 定期执行同步任务。
14.4 多进程 VS 多线程性能分析 在 Python 中,由于 GIL(全局解释器锁)的存在,多线程并不能真正实现并行计算。因此,根据任务特性选择合适的并发模型十分重要。
不同场景的最优选择 任务类型 多进程 多线程 推荐选择 计算密集型 效率高,可利用多核 受 GIL 限制,效率相对较低 多进程 IO 密集型 资源占用大 资源占用小,效率与多进程相当 多线程
📊 实际应用建议 :现代开发中,约 90%以上的程序属于 IO 密集型,适合使用多线程;对于数据分析、图像处理等计算密集型任务,则推荐使用多进程。也可以考虑混合使用:多进程下每个进程内再使用多线程。
计算密集型任务测试 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 from multiprocessing import Processfrom threading import Threadimport time''' 计算密集型任务对比测试 ''' def task (): """计算密集型任务""" res = 0 for i in range (10000000 ): res += i if __name__ == '__main__' : start_time = time.time() l = [] for i in range (10 ): p = Process(target=task) p.start() l.append(p) for p in l: p.join() end = time.time() print ("花费时间" , end - start_time)
IO 密集型任务测试 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 from multiprocessing import Processfrom threading import Threadimport time''' IO密集型任务对比测试 ''' def task (): """IO密集型任务,使用sleep模拟IO操作""" time.sleep(1 ) if __name__ == '__main__' : start_time = time.time() l = [] for i in range (100 ): p = Thread(target=task) p.start() l.append(p) for p in l: p.join() end = time.time() print ("花费时间" , end - start_time)
⚠️ 性能陷阱 :多线程在 IO 密集型任务中表现出色,但过多的线程可能导致线程切换开销增大,反而降低效率。经验值是控制线程数为 CPU 核心数的 2-4 倍。
14.5 协程技术详解 协程基础概念 协程 (Coroutine)也称为微线程,是一种用户态内的上下文切换技术,可以在单线程下实现并发效果。协程通过巧妙的编程技巧实现了程序主动让出和恢复执行的能力,使得单线程内可以 “模拟” 出并发的效果。
1 2 3 4 5 6 7 8 ''' 进程:资源单位 - 系统分配资源的基本单位,拥有独立的内存空间 线程:执行单位 - CPU调度和执行的最小单位,共享所属进程的内存空间 协程:根本不存在,它是程序员人为创造出来的(切换+保存状态) 当程序遇到IO的时候,通过我们的代码,让我们的程序自动完成切换 也就是通过代码监听IO,一旦程序遇到IO,就在代码层面自动切换,给CPU的感觉就是我们的程序没有IO 换句话说也就是我们欺骗了CPU '''
协程的核心原理是 “切换+保存状态 ”,即在多个任务之间来回切换,每次切换都保存当前任务的执行状态,下次切换回来继续执行。在 Python 中,可以通过 yield
关键字、greenlet
模块或 asyncio
库实现协程。
🔍 深入理解 :协程不是提升计算效率,而是提升 IO 效率。在 IO 密集型应用中,协程可以让 CPU 在等待 IO 的同时执行其他任务,从而提高资源利用率。协程的切换不需要操作系统参与,开销远小于线程切换。
概念 资源占用 切换开销 实现方式 适用场景 进程 高(独立内存空间) 高(涉及内存映射) 操作系统调度 CPU 密集型,需要隔离的任务 线程 中(共享内存但有独立栈) 中(上下文切换) 操作系统调度 混合型任务,兼顾计算与 IO 协程 低(共享线程内全部资源) 低(用户态切换) 程序自行控制 IO 密集型,高并发网络应用
协程效率对比 对于计算密集型任务时,使用协程反而会降低效率!
串行执行 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 import timedef f1 (): """计算密集型函数1""" n = 0 for i in range (10000000 ): n += i def f2 (): """计算密集型函数2""" n = 0 for i in range (10000000 ): n += i start_time = time.time() f1() f2() print ("串行执行总共用时:%.2f秒" % (time.time() - start_time))
使用 yield 实现协程切换 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 import timedef f1 (): """带yield的计算密集型函数1""" n = 0 for i in range (10000000 ): n += i yield def f2 (): """使用f1的生成器进行交替执行""" g = f1() n = 0 for i in range (10000000 ): n += i next (g) start_time = time.time() f2() print ("yield协程用时:%.2f秒" % (time.time() - start_time))
⚠️ 注意事项 :对于计算密集型任务,协程切换反而会增加开销,降低效率;但对于 IO 密集型任务,协程切换可以显著提高效率。这是因为在 IO 等待期间,协程可以切换到其他任务继续执行,避免了 CPU 空闲。
greenlet 模块(了解) greenlet 是一个轻量级的协程库,提供了基本的协程实现。它允许在不使用回调函数的情况下,在不同函数间来回切换执行,实现了所谓的 “确定性切换”。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 from greenlet import greenletimport timedef func_a (): """协程函数a""" while True : print ('函数a正在运行' ) time.sleep(1 ) b.switch() def func_b (): """协程函数b""" while True : print ('函数b正在运行' ) time.sleep(2 ) a.switch() if __name__ == '__main__' : a = greenlet(func_a) b = greenlet(func_b) a.switch()
greenlet 核心方法与属性 方法/属性名 描述 使用场景 示例 greenlet.getcurrent()
获取当前正在执行的 greenlet 对象 在函数内获取当前协程 current = greenlet.getcurrent()
greenlet.switch(value=None)
将控制权切换到另一个 greenlet 协程间的主动切换 g.switch('传递参数')
greenlet.parent
获取当前 greenlet 的父 greenlet 协程层级管理 parent = g.parent
throw(type, value=None, tb=None)
向 greenlet 对象中抛出异常 协程异常处理 g.throw(ValueError, '错误信息')
dead
判断 greenlet 是否已经执行完毕 协程状态检查 if g.dead: print('已执行完毕')
gr_frame
获取 greenlet 当前的帧对象 调试和检查协程状态 frame = g.gr_frame
run
绑定到 greenlet 的可调用对象 查看协程的目标函数 func = g.run
💡 使用技巧 :greenlet 适合实现简单的协程切换,但不支持自动在 IO 操作时切换,因此常与事件循环结合使用,如 gevent 库。greenlet 的优势在于它的轻量和灵活性,可以构建复杂的协程调度系统。
gevent 模块(了解) gevent 是基于 greenlet 的协程库,增加了事件循环和自动 IO 切换功能。它通过 “猴子补丁”(monkey patching)将标准库中的阻塞操作替换为非阻塞版本,使普通的同步代码能够以异步方式执行。
1 2 3 4 5 6 7 8 9 10 11 12 13 ''' gevent 是一个基于协程的 Python 网络库,它使用 greenlet 在 libev 或 libuv 等事件循环之上提供高级同步 API。gevent 实现了python 标准库里面大部分的阻塞式系统调用, 包括 socket、ssl、threading 和 select 等模块, 可以使用 "猴子补丁" 将这些阻塞式调用变为协作式运行。 猴子补丁的功能很强大,但是也带来了很多的风险,尤其是像 gevent 这种直接进行 API替换的补丁, 整个 Python 进程所使用的模块都会被替换,可能自己的代码能 hold 住, 但是其它第三方库,有时候问题并不好排查,即使排查出来也是很棘手,所以, 就像松本建议的那样,如果要使用猴子补丁,那么只是做功能追加, 尽量避免大规模的 API 覆盖。 虽然猴子补丁仍然是邪恶的(evil), 但在这种情况下它是 "有用的邪恶(useful evil)"。 '''
gevent 基础操作 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 import geventfrom gevent import monkeymonkey.patch_all() def foo (): """协程函数1""" print ('Running in foo' ) gevent.sleep(0 ) print ('Explicit context switch to foo' ) def bar (): """协程函数2""" print ('Running in bar' ) gevent.sleep(0 ) print ('Explicit context switch to bar' ) def baz (): """协程函数3""" print ('Running in baz' ) gevent.sleep(0 ) print ('Explicit context switch to baz' ) g1 = gevent.spawn(foo) g2 = gevent.spawn(bar) g3 = gevent.spawn(baz) gevent.joinall([g1, g2, g3])
gevent 常用 API 详解 方法/类名 描述 使用场景 实际应用示例 gevent.spawn(function, *args, **kwargs)
创建并运行协程 启动异步任务 启动多个 HTTP 请求并行处理 gevent.joinall(greenlets, timeout=None, raise_error=False)
等待多个协程完成 同步点,等待所有任务完成 批量处理多个数据源 gevent.sleep(seconds=0)
协程休眠并让出控制权 模拟 IO 操作,主动让出控制权 测试协程调度,防止 CPU 密集任务阻塞 gevent.wait(objects=None, timeout=None, count=None)
等待对象(协程)完成 等待部分任务完成 等待最快完成的结果 gevent.kill(greenlet, exception=GreenletExit)
终止协程 取消不需要的任务 实现任务超时取消 gevent.monkey.patch_all(socket=True, dns=True, ...)
应用猴子补丁 将同步库变为异步兼容 使用前替换标准库函数 gevent.queue.Queue
协程安全的队列 协程间通信和数据传递 生产者-消费者模式实现 gevent.event.Event
事件通知机制 协程间同步和通知 完成信号传递 gevent.pool.Pool
协程池 限制并发数量 控制网络请求并发数 gevent.select.select()
IO 多路复用 监控多个文件描述符 自定义事件循环
⚠️ 使用 gevent 注意事项 :
所有协程运行在同一线程中,不能跨线程同步数据 gevent.queue.Queue 是协程安全的,可以用于协程间通信 不能有长时间阻塞的 CPU 密集型操作,会阻塞整个事件循环 最好使用 gevent 自身的非阻塞库或已打补丁的标准库 猴子补丁会修改全局状态,可能影响第三方库的行为,应在所有导入前应用 调试协程比调试线程更困难,错误追踪可能会更复杂 实际应用场景示例 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 import geventfrom gevent import monkeyimport requestsimport timemonkey.patch_all() def fetch_url (url ): """获取URL内容的函数 Args: url: 要获取的网址 Returns: tuple: (url, 响应状态码, 内容长度) """ try : print (f"开始请求: {url} " ) start = time.time() response = requests.get(url, timeout=5 ) elapsed = time.time() - start print (f"完成请求: {url} , 耗时: {elapsed:.2 f} 秒" ) return url, response.status_code, len (response.content) except Exception as e: print (f"请求 {url} 出错: {e} " ) return url, 0 , 0 urls = [ "https://www.python.org" , "https://www.github.com" , "https://www.stackoverflow.com" , "https://www.wikipedia.org" , "https://www.reddit.com" ] start_time = time.time() tasks = [gevent.spawn(fetch_url, url) for url in urls] gevent.joinall(tasks) results = [task.value for task in tasks] print ("\n结果汇总:" )for url, status, length in results: print (f"URL: {url} , 状态码: {status} , 内容长度: {length} 字节" ) print (f"\n总耗时: {time.time() - start_time:.2 f} 秒" )
greenlet 与 gevent 的区别与选择 特性 greenlet gevent 实际应用建议 基本原理 轻量级上下文切换 基于 greenlet,增加事件循环 简单任务用 greenlet,复杂系统用 gevent IO 处理 不提供 IO 操作支持 提供自动 IO 切换机制 网络应用选择 gevent,自定义调度选择 greenlet 切换方式 需要显式调用 switch() 在 IO 操作时自动切换 手动控制流程用 greenlet,自动化处理用 gevent 复杂度 简单,仅提供基本切换 复杂,提供完整生态系统 小型项目用 greenlet,大型项目用 gevent 适用场景 简单协程调度 高并发网络应用 Web 爬虫、API 服务、代理服务器首选 gevent 性能 轻量,开销小 比 greenlet 略重,但实用性强 极致性能用 greenlet,平衡性能和开发效率用 gevent 学习曲线 简单,容易理解 较复杂,概念较多 入门协程从 greenlet 开始,再过渡到 gevent 社区支持 基础库,更新较少 活跃,有完整生态 长期项目建议使用 gevent
🌟 选择建议 :如果只需要轻量级的上下文切换,可以使用 greenlet;如果需要处理 IO 密集型应用,特别是网络编程,建议使用 gevent。大多数实际项目中,gevent 是更好的选择,因为它提供了更完整的功能和自动化的 IO 处理。
asyncio 协程技术 随着 Python 的发展,协程技术已经有了显著进步。从 Python 3.4 引入的 asyncio
库开始,Python 对协程的原生支持不断增强。到 2025 年,Python 已经拥有更成熟、更高效的协程生态系统。
asyncio 与原生协程 Python 3.5 引入的 async/await
语法使得协程编程变得更加直观和强大,这是目前最推荐的协程实现方式:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 import asyncioimport timeasync def fetch_data (url,delay ): """模拟从网络获取数据的异步函数""" print (f"开始获取数据:{url} ,延迟{delay} 秒" ) await asyncio.sleep(delay) print (f"成功获取数据长度:{len (url)} " ) return f"数据{url} " async def main (): """异步操作的主函数""" print (f"程序开始时间:{time.strftime('%Y-%m-%d %H:%M:%S' , time.localtime())} " ) print ("\n===== 串行执行示例 =====" ) start_time = time.time() result1 = await fetch_data("https://www.baidu.com" , 2 ) result2 = await fetch_data("https://www.sina.com.cn" ,3 ) print (f"串行执行结果:{result1} , {result2} " ) end_time = time.time() print (f"程序结束时间:{time.strftime('%Y-%m-%d %H:%M:%S' , time.localtime())} " ) print (f"程序耗时:{end_time-start_time} 秒" ) print ("\n===== 并行执行示例 =====" ) start_time = time.time() tasks = [ asyncio.create_task(fetch_data("https://www.baidu.com" , 2 )), asyncio.create_task(fetch_data("https://www.sina.com.cn" ,3 )), ] results = await asyncio.gather(*tasks) print (f"并行执行结果:{results} " ) end_time = time.time() print (f"程序结束时间:{time.strftime('%Y-%m-%d %H:%M:%S' , time.localtime())} " ) print (f"程序耗时:{end_time-start_time} 秒" ) if __name__ == '__main__' : asyncio.run(main())
asyncio 常用 API 方法/函数 描述 使用场景 示例 asyncio.run()
运行协程 程序入口点 asyncio.run(main())
asyncio.create_task()
创建任务 并行执行协程 task = asyncio.create_task(coro())
asyncio.gather()
并行运行多个协程 批量并发任务 results = await asyncio.gather(coro1(), coro2())
asyncio.wait_for()
带超时的等待 实现超时控制 await asyncio.wait_for(coro(), timeout=1.0)
asyncio.sleep()
非阻塞睡眠 模拟 IO 延迟 await asyncio.sleep(1.0)
asyncio.Queue
协程安全的队列 协程间数据传递 queue = asyncio.Queue(); await queue.put(item)
asyncio.Future
低级异步原语 自定义异步操作 future = asyncio.Future(); future.set_result(value)
asyncio.shield()
防止取消传播 保护关键协程 await asyncio.shield(critical_coro())
asyncio.as_completed()
按完成顺序返回结果 处理最先完成的任务 for task in asyncio.as_completed([coro1(), coro2()]): result = await task
Task 对象 Task
是 asyncio
中用于封装协程的对象,可以用于并发执行多个任务。可以通过 Task
对象等待协程完成。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 import asyncioasync def nested (): print ('进入 nested()' ) await asyncio.sleep(1 ) print ('离开 nested()' ) return '42' async def main (): task = asyncio.create_task(nested()) result = await task print (f'返回值:{result} ' ) asyncio.run(main())
Future 对象 Future
是 Task
的基类,表示一个未完成的结果。在底层异步操作中,Future
常常用来表示某些未决的操作结果。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 import asyncioasync def set_future_result (future ): await asyncio.sleep(2 ) future.set_result("Hello, world!" ) async def main (): loop = asyncio.get_running_loop() future = loop.create_future() asyncio.create_task(set_future_result(future)) result = await future print (f"Future的结果: {result} " ) asyncio.run(main())
异步上下文管理器 异步上下文管理器允许在进入和退出时执行异步操作,常用于异步资源管理。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 import asyncioclass AsyncResource : async def __aenter__ (self ): print ("资源获取" ) return self async def __aexit__ (self, exc_type, exc_value, traceback ): print ("资源释放" ) async def main (): async with AsyncResource(): print ("执行任务中" ) asyncio.run(main())
14.6 GIL 锁与 Python 并发性能 GIL(Global Interpreter Lock,全局解释器锁)是 CPython 解释器的一个特性,它确保同一时刻只有一个线程可以执行 Python 字节码。这个特性对 Python 多线程编程有着深远影响,也是导致 Python 速度慢的两大原因之一,其另外一个原因是因为 Python 是 解释形
语言,但后续可通过 pypy
技术实现 Python 的预编译,但唯独这个原因 Python 没有解决,Python 在早期开发时为解决垃圾回收机制内部问题采用了 GIL 锁,所以 Python 程序无法直接利用多核 CPU 的优势
1 2 3 4 5 6 7 ''' GIL全局解释器锁(Global Interpreter Lock),是CPython特有的一个物件, 作用是让一个进程中同一时刻只能有一个线程可以被CPU调用 如果程序想利用计算机的多核优势,让CPU同时处理一些任务,适合用多进程开发(即使资源开销大) 如果程序不想利用计算机的多核优势,适合用多线程开发 '''
GIL 的本质与工作原理 GIL 本质上是一把互斥锁,用于保护 Python 解释器的内部状态,主要解决了 Python 对象的内存管理问题。
GIL 特性 描述 实现方式 互斥锁(mutex) 作用对象 Python 解释器进程 控制范围 Python 字节码执行 释放时机 I/O 操作、执行固定字节码数量后 影响范围 仅影响 CPython,PyPy、Jython、IronPython 不受影响
🔍 深入理解 :GIL 并非 Python 语言本身的特性,而是 CPython 实现的产物。它解决了 CPython 简单引用计数式内存管理的线程安全问题,但也限制了多线程程序利用多核性能的能力。
1 2 3 4 5 6 7 8 9 def thread_execution (): while True : acquire_GIL() execute_bytecodes() release_GIL() wait_for_GIL()
并发与并行的区别 并发(Concurrency)和并行(Parallelism)是两个在计算机科学中经常出现的概念,虽然常被混用,但有着本质区别:
特性 并发(Concurrency) 并行(Parallelism) 定义 多个任务在同一时间间隔内发生 多个任务在同一时刻发生 重点 任务切换与调度 任务的同时执行 资源需求 可以在单处理器上通过时间片轮转实现 需要多个处理器或核心 执行方式 任务交替执行,共享处理器时间 每个任务有独立的处理器同时执行 适用场景 I/O 密集型任务,如网络请求、文件读写 计算密集型任务,如图像处理、科学计算 实现难度 相对简单,关注任务调度 相对复杂,需考虑数据分割、同步和合并 Python 实现 多线程、协程 多进程
🌟 关键理解 :由于 GIL 的存在,Python 的多线程实际上只能实现并发,而不能实现真正的并行。要实现并行,需要使用多进程或依赖不受 GIL 限制的扩展库(如使用 C 扩展的 NumPy)。
线程安全与并发控制 线程安全指在多线程环境下,程序能够正确地处理共享资源,不会因为多线程同时访问而导致数据不一致。尽管 Python 的 GIL 能减轻一些并发问题,但并不能完全保证线程安全。
线程安全问题示例 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 from threading import Threadimport timeimport randomcounter = 0 start_time = time.time() iterations_completed = 0 def increment_counter (): """增加计数器,但使用了非原子操作的方式""" global counter, iterations_completed for _ in range (1000000 ): local_counter = counter if random.random() < 0.00001 : time.sleep(0.00001 ) local_counter += 1 counter = local_counter iterations_completed += 1 def run_concurrent_threads (num_threads ): """运行多个线程同时增加计数器""" global counter, iterations_completed counter = 0 iterations_completed = 0 threads = [] for _ in range (num_threads): t = Thread(target=increment_counter) threads.append(t) t.start() for t in threads: t.join() expected = num_threads * 1000000 print (f"预期结果: {expected} " ) print (f"实际结果: {counter} " ) print (f"丢失的增量: {expected - counter} " ) print (f"完成的迭代次数: {iterations_completed} " ) if __name__ == '__main__' : print (f'开始时间为: {time.strftime("%Y-%m-%d %H:%M:%S" , time.localtime())} ' ) run_concurrent_threads(4 ) print (f"累计用时: {round (time.time() - start_time, 1 )} 秒" )
使用线程锁解决安全问题 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 from threading import Thread, Lockimport timeimport randomcounter = 0 start_time = time.time() iterations_completed = 0 counter_lock = Lock() def increment_counter (): """增加计数器,使用线程锁确保线程安全""" global counter, iterations_completed for _ in range (1000000 ): with counter_lock: counter += 1 iterations_completed += 1 def run_concurrent_threads (num_threads ): """运行多个线程同时增加计数器""" global counter, iterations_completed counter = 0 iterations_completed = 0 threads = [] for _ in range (num_threads): t = Thread(target=increment_counter) threads.append(t) t.start() for t in threads: t.join() expected = num_threads * 1000000 print (f"预期结果: {expected} " ) print (f"实际结果: {counter} " ) print (f"丢失的增量: {expected - counter} " ) print (f"完成的迭代次数: {iterations_completed} " ) if __name__ == '__main__' : print (f'开始时间为: {time.strftime("%Y-%m-%d %H:%M:%S" , time.localtime())} ' ) run_concurrent_threads(4 ) print (f"累计用时: {round (time.time() - start_time, 1 )} 秒" )
🔒 线程锁作用与注意事项 :
锁确保同一时刻只有一个线程能访问共享资源 锁会影响性能,特别是在竞争激烈的情况下 锁的粒度需要权衡:粒度太细会增加锁操作开销,太粗会降低并发度 锁可能引发死锁问题,需谨慎设计锁的获取顺序 Python 中的锁机制全面解析 Python 的 threading
模块提供了多种锁和同步原语,用于不同并发控制场景。深入理解这些锁的特性和适用场景,对于开发可靠的并发程序至关重要。
Python 锁类型及其特性 锁类型 描述 独占性 可重入性 公平性 注意事项 threading.Lock
基本互斥锁 是 否 非公平 最简单的锁,同一线程不能重复获取 threading.RLock
可重入锁 是 是 非公平 同一线程可多次获取,必须对应释放相同次数 threading.Condition
条件变量 - - 非公平 基于锁实现,提供 wait/notify 机制 threading.Semaphore
信号量 否 - 非公平 限制资源访问线程数量 threading.BoundedSemaphore
有界信号量 否 - 非公平 限制资源数量,防止过度释放 threading.Event
事件对象 - - - 用于线程间通知而非资源控制 threading.Barrier
栅栏对象 - - - 使多个线程同步到达某点再继续 queue.Queue
线程安全队列 - - 先进先出 内部带锁,用于线程间数据传递 multiprocessing.Lock
进程锁 是 否 非公平 用于进程间同步的锁 asyncio.Lock
异步锁 是 否 - 用于协程间的同步
互斥锁(Lock) 互斥锁是最基本的锁类型,它确保同一时刻只有一个线程可以访问受保护的资源。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 import concurrentimport threadingimport timefrom concurrent.futures import ThreadPoolExecutorlock = threading.Lock() shared_data = 0 def shared_resource (thread_id: int ): """访问共享资源的函数 Args:thread_id: 线程ID,用于标识不同线程 """ if lock.acquire(timeout=1 ): print (f"线程{thread_id} 获取锁" ) try : global shared_data current = shared_data time.sleep(0.1 ) shared_data = current + 1 print (f"线程{thread_id} 修改共享数据,当前值为{shared_data} " ) finally : lock.release() print (f"线程{thread_id} 释放锁" ) else : print (f"线程{thread_id} 获取锁失败" ) def shared_resource2 (thread_id: int ): """访问共享资源的函数 使用with语句,自动释放锁简化代码 """ with lock: print (f"线程{thread_id} 获取锁" ) try : global shared_data current = shared_data time.sleep(0.1 ) shared_data = current + 1 print (f"线程{thread_id} 修改共享数据,当前值为{shared_data} " ) finally : print (f"线程{thread_id} 释放锁" ) if __name__ == '__main__' : with ThreadPoolExecutor(max_workers=10 ) as executor: futures = [executor.submit(shared_resource, i) for i in range (1 , 11 )] concurrent.futures.wait(futures) print (f"最终共享数据值为{shared_data} " )
Lock 方法 描述 参数 返回值 acquire(blocking=True, timeout=-1)
获取锁 blocking: 是否阻塞, timeout: 超时时间(秒) 布尔值,表示是否获取成功 release()
释放锁 无 无,如果当前线程未持有锁则抛出 RuntimeError locked()
检查锁状态 无 布尔值,表示锁是否被某个线程持有 __enter__()
支持 with 语句 无 锁对象自身 __exit__()
with 语句退出时调用 异常信息 无,自动释放锁
可重入锁(RLock) 可重入锁允许同一个线程多次获取该锁,而不会导致自我死锁。这在递归调用或者嵌套加锁场景中特别有用。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 import threadingimport timerlock = threading.RLock() data = [[1 , 2 , 3 ], [4 , 5 , 6 ], [7 , 8 , 9 ]] def process_data (item, depth: int = 0 ): """递归处理数据 Args: item: 要处理的数据项,可以是列表或单个元素 depth: 当前递归深度,用于缩进显示 """ rlock.acquire() try : indent = " " * depth * 2 print (f'{indent} 线程 {threading.current_thread().name} 处理: {item} (深度: {depth} )' ) if isinstance (item, list ): print (f'{indent} ├── 发现列表,开始遍历子元素...' ) for i, sub_item in enumerate (item): prefix = "└── " if i == len (item) - 1 else "├── " print (f'{indent} {prefix} 处理子项 {i+1 } /{len (item)} : {sub_item} ' ) process_data(sub_item, depth + 1 ) time.sleep(0.1 ) else : print (f'{indent} └── 发现元素,进行处理...' ) time.sleep(0.5 ) print (f'{indent} 处理结果: {item * 2 } ' ) finally : rlock.release() if __name__ == '__main__' : threads = [] for i in range (3 ): t = threading.Thread(name=f"Thread-{i} " , target=process_data, args=(data,)) threads.append(t) t.start() time.sleep(0.5 ) for t in threads: t.join() print ("所有线程都结束了" )
RLock 方法 描述 与 Lock 的区别 acquire(blocking=True, timeout=-1)
获取锁 记录获取线程 ID 和次数 release()
释放锁 计数器减 1,只有为 0 时才真正释放 _is_owned()
检查当前线程是否持有锁 Lock 没有此方法
💡 使用建议 :一般推荐使用 RLock 而非 Lock,因为它更安全、更灵活,即使在不需要重入功能的场景下也不会有明显性能损失。
条件变量(Condition) - 根据条件控制锁 条件变量是一种高级的 同步原语(同步原语就是让多个线程能够"和谐相处"的机制)
,它允许线程等待特定条件满足后再继续执行。条件变量内部包含一个锁,用于控制对共享状态的访问。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 import threadingimport timeimport randomfrom typing import List , Any class Buffer : """线程安全的缓冲区,使用条件变量控制生产者消费者模型""" def __init__ (self, max_size: int = 5 ) -> None : """初始化缓冲区""" self .buffer: List [Any ] = [] self .max_size: int = max_size self .condition: threading.Condition = threading.Condition() def produce (self, item: Any , producer_id: int ) -> None : """生产者方法,向缓冲区添加数据""" with self .condition: while len (self .buffer) >= self .max_size: print (f"生产者 {producer_id} : 缓冲区已满,等待消费者..." ) self .condition.wait() self .buffer.append(item) print (f"生产者 {producer_id} : 添加 {item} 到缓冲区,当前大小: {len (self.buffer)} " ) self .condition.notify_all() def consume (self, consumer_id: int ) -> Any : """消费者方法,从缓冲区获取数据""" with self .condition: while len (self .buffer) == 0 : print (f"消费者 {consumer_id} : 缓冲区为空,等待生产者..." ) self .condition.wait() item = self .buffer.pop(0 ) print (f"消费者 {consumer_id} : 从缓冲区取出 {item} ,当前大小: {len (self.buffer)} " ) self .condition.notify_all() return item def producer_task (buffer: Buffer, producer_id: int ) -> None : """生产者任务""" for i in range (10 ): item = f"产品-{producer_id} -{i} " time.sleep(random.uniform(0.1 , 0.5 )) buffer.produce(item, producer_id) def consumer_task (buffer: Buffer, consumer_id: int ) -> None : """消费者任务""" for _ in range (7 ): time.sleep(random.uniform(0.2 , 0.7 )) item = buffer.consume(consumer_id) print (f"消费者 {consumer_id} 处理 {item} " ) def main () -> None : """主函数,创建并启动生产者和消费者线程""" shared_buffer = Buffer(max_size=3 ) producer_threads = [ threading.Thread(target=producer_task, args=(shared_buffer, i), name=f"Producer-{i} " ) for i in range (3 ) ] consumer_threads = [ threading.Thread(target=consumer_task, args=(shared_buffer, i), name=f"Consumer-{i} " ) for i in range (3 ) ] all_threads = producer_threads + consumer_threads for thread in all_threads: thread.start() for thread in all_threads: thread.join() print ("所有生产和消费任务已完成" ) if __name__ == "__main__" : main()
Condition 方法 描述 参数 注意事项 __init__(lock=None)
初始化条件变量 lock: 可选的 Lock 或 RLock 不指定则创建 RLock acquire(*args)
获取底层锁 同底层锁的 acquire 方法 一般通过 with 语句使用 release()
释放底层锁 无 一般通过 with 语句自动释放 wait(timeout=None)
等待条件 timeout: 超时时间(秒) 调用前必须已获得锁 wait_for(predicate, timeout=None)
等待直到条件为真 predicate: 条件函数, timeout: 超时时间 简化循环等待模式 notify(n=1)
唤醒 n 个等待的线程 n: 要唤醒的线程数 不会立即释放锁 notify_all()
唤醒所有等待的线程 无 适用于广播通知
⚠️ 使用注意 :
调用 wait()
会释放锁,允许其他线程修改条件状态 使用 wait_for()
可以避免虚假唤醒问题 调用 notify()
后锁不会立即释放,需要当前线程退出 with 块 使用 notify_all()
而非 notify()
可以避免信号丢失问题 信号量(Semaphore) - 控制并发数量 信号量是一种计数器,用于控制同时访问特定资源的线程数量,常用于限制并发访问数。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 import threadingimport timeimport randomfrom concurrent.futures import ThreadPoolExecutorpool_semaphore = threading.Semaphore(3 ) resource_pool = ['资源A' , '资源B' , '资源C' ] resource_in_use = {} resource_lock = threading.RLock() def worker (worker_id: int ): """工作线程函数,模拟使用受限资源""" print (f"工作线程 {worker_id} 等待获取资源..." ) with pool_semaphore: print (f"工作线程 {worker_id} 获取资源信号量" ) with resource_lock: available_resources = [r for r in resource_pool if r not in resource_in_use.values()] if not available_resources: print (f"工作线程 {worker_id} 没有找到可用资源,理论上不应该发生!" ) return resource_name = available_resources[0 ] resource_in_use[worker_id] = resource_name print (f"工作线程 {worker_id} 分配到资源: {resource_name} , 当前使用情况: {resource_in_use} " ) try : work_time = random.uniform(0.5 , 2.0 ) print (f"工作线程 {worker_id} 使用资源 {resource_name} 时间: {work_time} 秒" ) time.sleep(work_time) finally : with resource_lock: released_resource = resource_in_use.pop(worker_id) print (f"工作线程 {worker_id} 释放资源: {released_resource} , 当前使用情况: {resource_in_use} " ) if __name__ == '__main__' : with ThreadPoolExecutor(max_workers=3 ) as executor: futures = [executor.submit(worker, i) for i in range (3 )] for future in futures: future.result()
Semaphore 方法 描述 参数 返回值 __init__(value=1)
初始化信号量 value: 初始计数器值 无 acquire(blocking=True, timeout=None)
获取信号量 blocking: 是否阻塞, timeout: 超时时间 布尔值,表示是否获取成功 release(n=1)
释放信号量 n: 释放的数量 无 __enter__()
支持 with 语句 无 信号量对象自身 __exit__()
with 语句退出时调用 异常信息 无,自动释放信号量
有界信号量(BoundedSemaphore)详解 有界信号量是信号量的一个变种,它会检查释放操作是否会导致计数器超过初始值,如果超过则抛出异常。这可以帮助检测程序中的信号量使用错误。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 import threadingimport timebounded_semaphore = threading.BoundedSemaphore(3 ) def semaphore_demo (): """演示有界信号量与普通信号量的区别""" try : print ("获取信号量1次" ) bounded_semaphore.acquire() print ("获取信号量2次" ) bounded_semaphore.acquire() print ("获取信号量3次" ) bounded_semaphore.acquire() print ("信号量已用完,再获取将阻塞" ) print ("释放信号量1次" ) bounded_semaphore.release() print ("释放信号量2次" ) bounded_semaphore.release() print ("释放信号量3次" ) bounded_semaphore.release() try : print ("尝试额外释放一次" ) bounded_semaphore.release() print ("这一行不会执行" ) except ValueError as e: print (f"捕获预期异常: {e} " ) except Exception as e: print (f"意外错误: {e} " ) semaphore_demo()
🔍 Semaphore vs BoundedSemaphore :
Semaphore
允许无限制地调用 release()
,即使计数器超过初始值BoundedSemaphore
在计数器超过初始值时会抛出 ValueError
异常生产环境推荐使用 BoundedSemaphore
,或安全的使用 with 语句,保证程序安全 事件对象(Event) 事件对象是最简单的线程通信机制之一,它允许一个线程发送信号给其他线程,适合简单的 “一次性通知” 场景。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 import threadingimport timeimport randomfrom typing import List from concurrent.futures import ThreadPoolExecutorstart_event = threading.Event() results: List [str ] = [] results_lock = threading.RLock() def worker (worker_id:int ) -> None : """工作线程函数,等待开始信号""" prep_time = random.uniform(0.5 , 1.5 ) time.sleep(prep_time) print (f"工作线程{worker_id} 准备完毕,等待开始信号" ) start_event.wait() print (f"工作线程{worker_id} 开始工作" ) work_time = random.uniform(1 , 2 ) time.sleep(work_time) with results_lock: results.append(f"工作线程{worker_id} 完成工作" ) print (f"工作线程{worker_id} 完成工作,用时{work_time:.2 f} 秒" ) if __name__ == '__main__' : with ThreadPoolExecutor(max_workers=3 ) as executor: futures = [executor.submit(worker, i) for i in range (3 )] time.sleep(2 ) print ("发送开始信号" ) start_event.set () for future in futures: future.result() print ("所有工作线程完成" ) print (results)
Event 方法 描述 参数 返回值 set()
设置事件,唤醒所有等待的线程 无 无 clear()
清除事件标志 无 无 is_set()
判断事件是否已设置 无 布尔值 wait(timeout=None)
等待事件被设置 timeout: 超时时间 如果超时返回 False,否则返回 True
💡 使用场景 :
启动信号:所有线程等待统一开始 停止信号:通知所有线程停止工作 一次性通知:当某条件满足时通知等待线程 栅栏对象(Barrier) 栅栏是一种同步原语,它要求固定数量的线程都到达栅栏点后,才允许所有线程继续执行。这对于分阶段任务的同步特别有用。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 import threadingimport timeimport randomnum_parties = 4 barrier = threading.Barrier(num_parties) def worker (worker_id ): """工作线程函数,模拟多阶段工作 Args: worker_id: 工作线程ID """ print (f"工作线程 {worker_id} 开始第一阶段工作" ) work_time = random.uniform(0.5 , 2.0 ) time.sleep(work_time) print (f"工作线程 {worker_id} 完成第一阶段,用时 {work_time:.2 f} 秒,等待其他线程..." ) try : barrier.wait() print (f"工作线程 {worker_id} 通过第一个栅栏,开始第二阶段" ) work_time = random.uniform(0.5 , 2.0 ) time.sleep(work_time) print (f"工作线程 {worker_id} 完成第二阶段,用时 {work_time:.2 f} 秒,等待其他线程..." ) barrier.wait() print (f"工作线程 {worker_id} 通过第二个栅栏,工作全部完成" ) except threading.BrokenBarrierError: print (f"工作线程 {worker_id} 检测到栅栏被破坏" ) threads = [] for i in range (num_parties): t = threading.Thread(target=worker, args=(i,)) threads.append(t) t.start() for t in threads: t.join() print ("所有工作阶段已完成" )
Barrier 方法 描述 参数 返回值 __init__(parties, action=None, timeout=None)
初始化栅栏 parties: 参与方数量, action: 所有线程到达时执行的回调, timeout: 等待超时 无 wait(timeout=None)
等待所有参与方到达 timeout: 覆盖默认超时时间 线程的到达序号(0 ~ n-1) reset()
将栅栏重置到初始状态 无 无,正在等待的线程会抛出 BrokenBarrierError abort()
将栅栏置于损坏状态 无 无,所有等待线程会抛出 BrokenBarrierError parties
参与方数量(属性) 无 整数 n_waiting
当前等待的线程数(属性) 无 整数 broken
栅栏是否处于损坏状态(属性) 无 布尔值
⚠️ 注意事项 :
如果等待超时,栅栏会进入损坏状态 如果等待时的线程被中断,栅栏也会损坏 可以通过 reset()
方法重新使用已损坏的栅栏 线程安全队列(Queue) queue
模块提供的 Queue
类是一个线程安全的队列实现,通常用于线程间的数据传递和任务分发。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 import threadingimport queueimport timeimport randomfrom concurrent.futures import ThreadPoolExecutortask_queue = queue.Queue(maxsize=10 ) result_queue = queue.Queue() exit_flag = threading.Event() def producer (): """生产者线程,产生任务""" for i in range (20 ): task = f"任务-{i} " task_queue.put(task) print (f"生产者: 添加 {task} 到队列,当前队列大小: {task_queue.qsize()} " ) time.sleep(random.uniform(0.1 , 0.3 )) print ("生产者: 所有任务已产生,设置退出标志" ) exit_flag.set () def consumer (consumer_id ): """消费者线程,处理任务""" while not exit_flag.is_set() or not task_queue.empty(): try : task = task_queue.get(timeout=1 ) print (f"消费者 {consumer_id} : 开始处理 {task} " ) process_time = random.uniform(0.5 , 1.5 ) time.sleep(process_time) result = f"结果-{task} -耗时{process_time:.2 f} 秒" result_queue.put((consumer_id, result)) task_queue.task_done() print (f"消费者 {consumer_id} : 完成处理 {task} " ) except queue.Empty: if exit_flag.is_set(): break print (f"消费者 {consumer_id} : 队列暂时为空,等待任务..." ) time.sleep(0.5 ) print (f"消费者 {consumer_id} : 退出" ) if __name__ == '__main__' : with ThreadPoolExecutor(max_workers=4 ) as executor: producers = [executor.submit(producer) for _ in range (2 )] consumers = [executor.submit(consumer, i) for i in range (2 )] all_futures = producers + consumers for future in all_futures: future.result() print ("\n处理结果:" ) while not result_queue.empty(): consumer_id, result = result_queue.get() print (f"消费者 {consumer_id} 的结果: {result} " )
Queue 方法/属性 描述 参数 返回值/特性 __init__(maxsize=0)
初始化队列 maxsize: 队列最大大小,0 表示无限 无 put(item, block=True, timeout=None)
放入元素 item: 元素, block: 是否阻塞, timeout: 超时时间 无,队列满时可能阻塞或抛出 Full 异常 get(block=True, timeout=None)
获取元素 block: 是否阻塞, timeout: 超时时间 队列元素,队列空时可能阻塞或抛出 Empty 异常 task_done()
标记任务完成 无 无 join()
等待队列中所有任务处理完成 无 无 qsize()
返回队列大小 无 整数 empty()
检查队列是否为空 无 布尔值 full()
检查队列是否已满 无 布尔值 put_nowait(item)
非阻塞版本的 put item: 元素 无,队列满时抛出 Full 异常 get_nowait()
非阻塞版本的 get 无 队列元素,队列空时抛出 Empty 异常
💡 Queue 变种 :
queue.LifoQueue
: 后进先出队列(栈)queue.PriorityQueue
: 优先级队列,元素为(优先级, 数据)元组queue.SimpleQueue
: 简单的无界队列,不支持 task_done 和 join死锁问题分析与解决 死锁是指两个或多个线程互相等待对方释放资源,导致程序无法继续执行的情况。
死锁示例 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 import threadingimport timelock_1 = threading.Lock() lock_2 = threading.Lock() def task1 (): """第一个任务,先获取lock_1,再获取lock_2""" print ("任务1开始尝试获取锁..." ) lock_1.acquire() print ("任务1获取到lock_1" ) time.sleep(0.5 ) print ("任务1尝试获取lock_2" ) lock_2.acquire() try : print ("任务1同时获取了两把锁" ) finally : lock_2.release() print ("任务1释放了lock_2" ) lock_1.release() print ("任务1释放了lock_1" ) def task2 (): """第二个任务,先获取lock_2,再获取lock_1""" print ("任务2开始尝试获取锁..." ) lock_2.acquire() print ("任务2获取到lock_2" ) time.sleep(0.5 ) print ("任务2尝试获取lock_1" ) lock_1.acquire() try : print ("任务2同时获取了两把锁" ) finally : lock_1.release() print ("任务2释放了lock_1" ) lock_2.release() print ("任务2释放了lock_2" ) if __name__ == '__main__' : t1 = threading.Thread(target=task1) t2 = threading.Thread(target=task2) t1.start() t2.start() time.sleep(5 ) print (f"线程1状态: {'活跃' if t1.is_alive() else '已结束' } " ) print (f"线程2状态: {'活跃' if t2.is_alive() else '已结束' } " ) if t1.is_alive() and t2.is_alive(): print ("检测到可能的死锁情况!" )
⚠️ 死锁的四个必要条件 :
互斥条件 :资源不能被共享,一次只能被一个线程使用请求与保持条件 :线程已获得资源,但又提出新的资源请求不剥夺条件 :线程已获得的资源不能强制被剥夺循环等待条件 :线程之间形成头尾相接的循环等待资源关系死锁解决方案 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 import threadingimport timelock_1 = threading.Lock() lock_2 = threading.Lock() def acquire_locks_safe (lock_a, lock_b, thread_name ): """安全地获取两个锁,使用超时机制避免死锁 Args: lock_a: 第一个锁 lock_b: 第二个锁 thread_name: 线程名称 Returns: bool: 是否成功获取两个锁 """ while True : got_lock_a = lock_a.acquire(timeout=1 ) if got_lock_a: print (f"{thread_name} : 获取到第一个锁" ) try : got_lock_b = lock_b.acquire(timeout=1 ) if got_lock_b: print (f"{thread_name} : 获取到第二个锁" ) return True print (f"{thread_name} : 获取第二个锁失败,释放第一个锁并重试" ) finally : if not got_lock_b: lock_a.release() time.sleep(0.1 ) else : print (f"{thread_name} : 获取第一个锁失败,重试" ) time.sleep(0.1 ) def task1_fixed (): """修复死锁的任务1 - 使用安全获取锁函数""" print ("任务1开始执行..." ) if acquire_locks_safe(lock_1, lock_2, "任务1" ): try : print ("任务1: 同时持有两把锁,执行关键代码" ) time.sleep(0.5 ) finally : lock_2.release() print ("任务1: 释放lock_2" ) lock_1.release() print ("任务1: 释放lock_1" ) else : print ("任务1: 无法获取所需的锁,任务取消" ) def task2_fixed (): """修复死锁的任务2 - 使用一致的锁获取顺序""" print ("任务2开始执行..." ) if acquire_locks_safe(lock_1, lock_2, "任务2" ): try : print ("任务2: 同时持有两把锁,执行关键代码" ) time.sleep(0.5 ) finally : lock_2.release() print ("任务2: 释放lock_2" ) lock_1.release() print ("任务2: 释放lock_1" ) else : print ("任务2: 无法获取所需的锁,任务取消" ) t1 = threading.Thread(target=task1_fixed) t2 = threading.Thread(target=task2_fixed) t1.start() t2.start() t1.join() t2.join() print ("所有线程执行完毕,没有死锁" )
🛠️ 死锁预防方法 :
按顺序获取锁 :使所有线程按相同顺序获取锁超时机制 :使用 acquire(timeout=N)
设置获取锁的超时时间一次性获取所有锁 :创建更高级别的锁来同时获取多个锁使用显式资源分级 :为资源分配层级,只允许按层级顺序获取避免嵌套锁 :设计简化的锁策略,减少同时持有多个锁的情况使用 with
语句 :确保锁在异常情况下也能被释放原子操作与锁优化 在并发编程中,原子操作是指不可被中断的操作,它们要么完全执行,要么完全不执行。Python 提供了一些原子操作工具,可以减少对锁的依赖。
threading.local
对象 - 线程本地存储线程本地存储提供了一种每个线程拥有自己独立数据副本的机制,避免了共享状态带来的并发问题。
我们可以往 threading.local()上挂载对象,这样我们的每一个线程就会有属于自己的独立数据
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 import threadingimport timeimport randomfrom concurrent.futures import ThreadPoolExecutorthread_local_data = threading.local() def process_request (request_id: int ) -> None : """处理请求的工作函数""" thread_local_data.user_id = f"user-{random.randint(1000 , 9999 )} " thread_local_data.request = request_id thread_local_data.start_time = time.time() print (f"请求 {request_id} : 开始处理 [线程: {threading.current_thread().name} , 用户: {thread_local_data.user_id} ]" ) process_stage("验证" ) process_stage("处理" ) process_stage("响应" ) elapsed = time.time() - thread_local_data.start_time print (f"请求 {request_id} : 完成处理,总耗时 {elapsed:.2 f} 秒 [线程: {threading.current_thread().name} ]" ) def process_stage (stage_name: str ): """处理请求的某个阶段""" request_id = thread_local_data.request user_id = thread_local_data.user_id time.sleep(random.uniform(0.1 , 0.5 )) print (f"请求 {request_id} : {stage_name} 阶段完成 [用户: {user_id} ]" ) if __name__ == '__main__' : with ThreadPoolExecutor(max_workers=10 ) as executor: futures = [executor.submit(process_request, i) for i in range (10 )] for future in futures: future.result() print ("所有请求处理完成" )
functools.lru_cache
装饰器提供了一个线程安全的缓存机制,当一个函数的计算逻辑十分复杂,我们就可以采用缓存来优化这一点
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 import functoolsimport time@functools.lru_cache(maxsize=128 ) def fibonacci (n ): """计算斐波那契数列的第n个数,使用LRU缓存优化性能""" if n <= 1 : return n return fibonacci(n-1 ) + fibonacci(n-2 ) def demonstrate_lru_cache (): """演示LRU缓存的效果""" def fibonacci_no_cache (n ): if n <= 1 : return n return fibonacci_no_cache(n-1 ) + fibonacci_no_cache(n-2 ) n = 35 start = time.time() result1 = fibonacci_no_cache(n) end = time.time() print (f"无缓存计算fibonacci({n} ) = {result1} ,耗时: {end - start:.4 f} 秒" ) start = time.time() result2 = fibonacci(n) end = time.time() print (f"首次使用缓存计算fibonacci({n} ) = {result2} ,耗时: {end - start:.4 f} 秒" ) start = time.time() result3 = fibonacci(n) end = time.time() print (f"再次使用缓存计算fibonacci({n} ) = {result3} ,耗时: {end - start:.8 f} 秒" ) print (f"缓存信息: {fibonacci.cache_info()} " ) if __name__ == "__main__" : demonstrate_lru_cache()
锁的高级应用模式 读写锁模式 Python 中的读写锁(Read-Write Lock)主要用于在多线程环境中控制对共享资源的访问。它允许多个线程同时读取共享数据,但在写操作时,其他线程不能进行读或写操作。具体的应用场景包括:
数据共享与并发读取 :当多个线程需要读取同一份数据时,使用读锁可以提高并发性,允许多个线程同时访问数据,而不需要每次访问都加锁。写操作的独占性 :当有线程进行写操作时,需要获取写锁,这样可以确保写操作的独占性,避免数据竞争和不一致性。性能优化 :在读多写少的场景下,读写锁能提高性能,因为它允许多个线程并行读取数据,而只有在写入时才会阻塞其他线程。我们先从 Python 原生实现读写锁来作为演示,掌握了原生的方式,我们可以使用 readerwriterlock
第三方库来帮我们快速实现读写锁
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 import threadingimport timeimport randomclass ReadWriteLock : """读写锁实现 允许多个读取者同时访问,或单个写入者独占访问 """ def __init__ (self ): """初始化读写锁""" self ._read_ready = threading.Condition(threading.RLock()) self ._readers = 0 self ._writers = 0 self ._write_waiting = 0 self ._writer = None def acquire_read (self ): """获取读锁""" with self ._read_ready: while self ._writers > 0 or self ._write_waiting > 0 : self ._read_ready.wait() self ._readers += 1 def release_read (self ): """释放读锁""" with self ._read_ready: self ._readers -= 1 if self ._readers == 0 : self ._read_ready.notify_all() def acquire_write (self ): """获取写锁""" me = threading.get_ident() with self ._read_ready: self ._write_waiting += 1 while self ._readers > 0 or self ._writers > 0 : self ._read_ready.wait() self ._write_waiting -= 1 self ._writers += 1 self ._writer = me def release_write (self ): """释放写锁""" with self ._read_ready: if self ._writer != threading.get_ident(): raise RuntimeError("释放未持有的写锁" ) self ._writers -= 1 self ._writer = None self ._read_ready.notify_all() class ReadLock : def __init__ (self, rw_lock ): self .rw_lock = rw_lock def __enter__ (self ): self .rw_lock.acquire_read() return self def __exit__ (self, exc_type, exc_val, exc_tb ): self .rw_lock.release_read() class WriteLock : def __init__ (self, rw_lock ): self .rw_lock = rw_lock def __enter__ (self ): self .rw_lock.acquire_write() return self def __exit__ (self, exc_type, exc_val, exc_tb ): self .rw_lock.release_write() def read_lock (self ): """获取读锁上下文管理器""" return self .ReadLock(self ) def write_lock (self ): """获取写锁上下文管理器""" return self .WriteLock(self ) shared_data = {'count' : 0 , 'values' : []} rw_lock = ReadWriteLock() def reader (reader_id ): """读取者线程 Args: reader_id: 读取者ID """ for _ in range (5 ): with rw_lock.read_lock(): count = shared_data['count' ] values = shared_data['values' ].copy() time.sleep(random.uniform(0.05 , 0.1 )) print (f"读取者 {reader_id} : 读取到 count={count} , values={values} " ) time.sleep(random.uniform(0.1 , 0.3 )) def writer (writer_id ): """写入者线程 Args: writer_id: 写入者ID """ for i in range (3 ): new_value = writer_id * 100 + i with rw_lock.write_lock(): shared_data['count' ] += 1 shared_data['values' ].append(new_value) time.sleep(random.uniform(0.1 , 0.2 )) print (f"写入者 {writer_id} : 更新为 count={shared_data['count' ]} , values={shared_data['values' ]} " ) time.sleep(random.uniform(0.3 , 0.7 )) readers = [threading.Thread(target=reader, args=(i,)) for i in range (5 )] writers = [threading.Thread(target=writer, args=(i,)) for i in range (3 )] all_threads = readers + writers for thread in all_threads: thread.start() for thread in all_threads: thread.join() print (f"最终数据: count={shared_data['count' ]} , values={shared_data['values' ]} " )
使用 readerwriterlock
库实现读写锁 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 """ 读写锁实现示例 readerwriterlock库提供了三种读写锁实现: - RWLockRead:读者优先(第一读者-写者问题) - RWLockWrite:写者优先(第二读者-写者问题) - RWLockFair:公平优先(第三读者-写者问题) 每种锁都有对应的可降级版本(带D后缀),允许将锁从写模式降级到读模式 """ import threadingimport timefrom readerwriterlock import rwlockrw_lock = rwlock.RWLockFairD() shared_data = {'count' : 0 , 'values' : []} def read_demo (reader_id, sleep_time=0.5 ): """用于演示的读取函数""" read_lock = rw_lock.gen_rlock() try : with read_lock: print (f"读取者 {reader_id} : 获得读锁" ) time.sleep(sleep_time) print (f"读取者 {reader_id} : 完成读取" ) finally : print (f"读取者 {reader_id} : 释放读锁" ) def write_demo (writer_id, sleep_time=0.5 ): """用于演示的写入函数""" write_lock = rw_lock.gen_wlock() try : with write_lock: print (f"写入者 {writer_id} : 获得写锁" ) time.sleep(sleep_time) print (f"写入者 {writer_id} : 完成写入" ) finally : print (f"写入者 {writer_id} : 释放写锁" ) def demonstrate_read_read_nonblocking (): """演示读读不互斥""" print ("\n=== 演示:读读不互斥 ===" ) threads = [] for i in range (5 ): thread = threading.Thread(target=read_demo, args=(i, 0.5 )) threads.append(thread) thread.start() for thread in threads: thread.join() def demonstrate_read_write_blocking (): """演示读写互斥""" print ("\n=== 演示:读写互斥 ===" ) read_thread = threading.Thread(target=read_demo, args=(0 , 2 )) read_thread.start() time.sleep(0.1 ) write_thread = threading.Thread(target=write_demo, args=(0 , 0.5 )) write_thread.start() read_thread.join() write_thread.join() def demonstrate_write_write_blocking (): """演示写写互斥""" print ("\n=== 演示:写写互斥 ===" ) write_thread1 = threading.Thread(target=write_demo, args=(0 , 2 )) write_thread1.start() time.sleep(0.1 ) write_thread2 = threading.Thread(target=write_demo, args=(1 , 0.5 )) write_thread2.start() write_thread1.join() write_thread2.join() def demonstrate_timeout (): """演示锁获取超时""" print ("\n=== 演示:锁获取超时 ===" ) write_thread = threading.Thread(target=write_demo, args=(0 , 3 )) write_thread.start() time.sleep(0.1 ) read_lock = rw_lock.gen_rlock() if read_lock.acquire(blocking=True , timeout=0.5 ): try : print ("读取者: 成功获得读锁(不应该发生)" ) finally : read_lock.release() else : print ("读取者: 获取读锁超时(预期行为)" ) write_thread.join() if __name__ == '__main__' : demonstrate_read_read_nonblocking() demonstrate_read_write_blocking() demonstrate_write_write_blocking() demonstrate_timeout()
读写锁特性 描述 优势 适用场景 读共享/写独占 多个读取可并发,写入需独占 提高读多写少场景的并发性 配置数据、缓存系统、数据集 读写优先级 可以设置读优先或写优先 根据应用需求调整性能特性 根据读写比例调整策略 升级/降级 支持锁的升级(读 → 写)或降级(写 → 读) 灵活处理复杂访问模式 先检查后修改的操作
💡 使用建议 :
读多写少的场景推荐使用读写锁 注意防止 “写饥饿”,即读取者太多导致写入者长时间等待 锁排序(解决死锁) 为避免死锁,一个常用的技术是确保所有线程按照相同的顺序获取多个锁。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 import threadingimport timeclass Account : """模拟银行账户""" def __init__ (self, name: str , balance: int = 0 ): """初始化账户""" self .name = name self .balance = balance self .lock = threading.RLock() self .id = id (self ) def __str__ (self ): return f"账户{self.name} [余额={self.balance} ]" def transfer_money (from_account: Account, to_account: Account, amount: int , thread_name: str ) -> None : """ 在账户间转账,使用账户ID排序策略避免死锁 from_account: 转出账户 to_account: 转入账户 amount: 转账金额 thread_name: 线程名称 """ first, second = sorted ([from_account, to_account], key=lambda x: x.id ) print (f"{thread_name} : 尝试锁定账户 {first.name} " ) with first.lock: print (f"{thread_name} : 已锁定账户 {first.name} " ) time.sleep(0.1 ) print (f"{thread_name} : 尝试锁定账户 {second.name} " ) with second.lock: print (f"{thread_name} : 已锁定账户 {second.name} " ) from_account.balance -= amount to_account.balance += amount print (f"{thread_name} : 已从{from_account.name} 转账{amount} 元到{to_account.name} " ) if __name__ == '__main__' : alice = Account("Alice" , 1000 ) bob = Account("Bob" , 1000 ) print (f"初始状态: {alice} , {bob} " ) t1 = threading.Thread( name="Thread-1" , target=transfer_money, args=(alice, bob, 500 , "转账线程1" ) ) t2 = threading.Thread( name="Thread-2" , target=transfer_money, args=(bob, alice, 300 , "转账线程2" ) ) t1.start() t2.start() t1.join() t2.join() print (f"最终状态: {alice} , {bob} " )
两阶段锁定 两阶段锁定是一种事务并发控制协议,分为获取阶段和释放阶段,可以保证事务的可串行化。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 import threadingimport timeclass TwoPhaseLockDatabase : """演示两阶段锁定协议的简单数据库""" def __init__ (self ): """初始化数据库""" self .data = {'A' : 100 , 'B' : 200 } self .locks = {'A' : threading.RLock(), 'B' : threading.RLock()} def transaction (self, items_to_read, items_to_write, operation ): """执行两阶段锁定事务 Args: items_to_read: 需要读取的数据项列表 items_to_write: 需要写入的数据项列表 operation: 事务操作函数 Returns: bool: 事务是否成功 """ all_items = sorted (set (items_to_read + items_to_write)) acquired_locks = [] try : print (f"事务 {threading.current_thread().name} : 开始获取锁" ) for item in all_items: if self .locks[item].acquire(timeout=1 ): acquired_locks.append(item) print (f"事务 {threading.current_thread().name} : 已锁定 {item} " ) else : raise TimeoutError(f"获取 {item} 的锁超时" ) result = operation(self .data) print (f"事务 {threading.current_thread().name} : 操作完成" ) return result except Exception as e: print (f"事务 {threading.current_thread().name} : 错误 - {e} " ) return False finally : for item in acquired_locks: self .locks[item].release() print (f"事务 {threading.current_thread().name} : 已释放 {item} " ) def transfer_money (db, from_account, to_account, amount ): """转账事务""" def operation (data ): if data[from_account] < amount: print (f"账户 {from_account} 余额不足" ) return False time.sleep(0.1 ) data[from_account] -= amount data[to_account] += amount print (f"已从 {from_account} 转账 {amount} 到 {to_account} " ) return True return db.transaction([from_account, to_account], [from_account, to_account], operation) def run_transaction (db, thread_id, from_acc, to_acc, amount ): """执行事务的线程函数""" print (f"线程 {thread_id} : 尝试转账 {amount} 从 {from_acc} 到 {to_acc} " ) start_time = time.time() success = transfer_money(db, from_acc, to_acc, amount) elapsed = time.time() - start_time status = "成功" if success else "失败" print (f"线程 {thread_id} : 转账{status} ,耗时 {elapsed:.2 f} 秒" ) db = TwoPhaseLockDatabase() print (f"初始账户状态: {db.data} " )threads = [] transactions = [ ("A" , "B" , 30 ), ("B" , "A" , 50 ), ("A" , "B" , 20 ) ] for i, (from_acc, to_acc, amount) in enumerate (transactions): t = threading.Thread( name=f"Transaction-{i} " , target=run_transaction, args=(db, i, from_acc, to_acc, amount) ) threads.append(t) t.start() for t in threads: t.join() print (f"最终账户状态: {db.data} " )print (f"总金额: {sum (db.data.values())} " )
超时重试模式 在并发环境中,有时获取锁可能会失败。超时重试模式可以增加获取锁的成功概率,同时避免永久阻塞。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 import threadingimport timeimport randomresource_value = 0 resource_lock = threading.Lock() def update_resource (worker_id, max_retries=3 ): """更新共享资源,使用超时重试模式 Args: worker_id: 工作线程ID max_retries: 最大重试次数 Returns: bool: 是否成功更新 """ global resource_value operation_id = random.randint(10000 , 99999 ) print (f"工作线程 {worker_id} [操作:{operation_id} ]: 尝试更新资源" ) retry_count = 0 backoff = 0.1 while retry_count < max_retries: if resource_lock.acquire(timeout=0.5 ): try : print (f"工作线程 {worker_id} [操作:{operation_id} ]: 获取到锁,当前值: {resource_value} " ) local_value = resource_value work_time = random.uniform(0.1 , 1.0 ) time.sleep(work_time) resource_value = local_value + 1 print (f"工作线程 {worker_id} [操作:{operation_id} ]: 更新成功,新值: {resource_value} ,耗时: {work_time:.2 f} 秒" ) return True finally : resource_lock.release() print (f"工作线程 {worker_id} [操作:{operation_id} ]: 释放锁" ) else : retry_count += 1 print (f"工作线程 {worker_id} [操作:{operation_id} ]: 获取锁超时,重试 {retry_count} /{max_retries} " ) if retry_count < max_retries: time.sleep(backoff) backoff *= 2 print (f"工作线程 {worker_id} [操作:{operation_id} ]: 达到最大重试次数,放弃操作" ) return False def worker_thread (worker_id, operations ): """工作线程函数 Args: worker_id: 工作线程ID operations: 要执行的操作次数 """ success_count = 0 for i in range (operations): if update_resource(worker_id): success_count += 1 time.sleep(random.uniform(0.1 , 0.5 )) print (f"工作线程 {worker_id} : 完成 {success_count} /{operations} 次成功更新" ) threads = [] for i in range (5 ): t = threading.Thread(target=worker_thread, args=(i, 3 )) threads.append(t) t.start() for t in threads: t.join() print (f"最终资源值: {resource_value} " )
多进程与多线程结合的混合模型 对于复杂应用,常常需要结合多进程和多线程的优势:多进程跨越 GIL 限制利用多核心,每个进程内使用多线程处理 I/O 任务。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 import osimport timeimport multiprocessingimport threadingimport concurrent.futuresimport requestsfrom io import StringIOimport csvdef io_task (url ): """模拟I/O密集型任务:发送HTTP请求并处理响应""" try : response = requests.get(url, timeout=5 ) print (f"线程 {threading.current_thread().name} 完成请求 {url} , 状态码: {response.status_code} " ) return response.status_code except Exception as e: print (f"线程 {threading.current_thread().name} 请求 {url} 失败: {str (e)} " ) return None def cpu_task (data ): """模拟CPU密集型任务:处理CSV数据""" result = 0 for _ in range (1000000 ): result += 1 csv_data = StringIO(data) reader = csv.reader(csv_data) rows = list (reader) print (f"进程 {os.getpid()} 处理了 {len (rows)} 行数据" ) return len (rows) def process_worker (process_id, urls ): """每个进程的工作函数,使用线程池处理I/O任务""" print (f"进程 {os.getpid()} (ID: {process_id} ) 启动" ) with concurrent.futures.ThreadPoolExecutor(max_workers=4 ) as executor: future_to_url = {executor.submit(io_task, url): url for url in urls} results = [] for future in concurrent.futures.as_completed(future_to_url): url = future_to_url[future] try : status_code = future.result() results.append((url, status_code)) except Exception as e: print (f"处理 {url} 时出错: {str (e)} " ) sample_csv = "col1,col2,col3\n1,2,3\n4,5,6\n7,8,9" cpu_result = cpu_task(sample_csv) print (f"进程 {os.getpid()} (ID: {process_id} ) 完成所有任务" ) return results, cpu_result def main (): all_urls = [ f"https://httpbin.org/delay/{i%3 } " for i in range (16 ) ] url_chunks = [all_urls[i:i+4 ] for i in range (0 , len (all_urls), 4 )] start_time = time.time() with concurrent.futures.ProcessPoolExecutor(max_workers=4 ) as process_executor: futures = [process_executor.submit(process_worker, i, urls) for i, urls in enumerate (url_chunks)] for future in concurrent.futures.as_completed(futures): try : io_results, cpu_result = future.result() print (f"进程返回结果: {len (io_results)} 个URL请求, CPU任务处理了 {cpu_result} 行数据" ) except Exception as e: print (f"进程执行出错: {str (e)} " ) elapsed_time = time.time() - start_time print (f"总执行时间: {elapsed_time:.2 f} 秒" ) if __name__ == "__main__" : main()
这种混合模型充分利用了 Python 的并发性能:
多进程并行 :跨越 GIL 限制,在多个 CPU 核心上同时执行 Python 代码每进程多线程 :处理进程内的 I/O 密集型任务,提高 I/O 并发性任务队列 :有效分配和管理工作负载,平衡资源利用细粒度锁与粗粒度锁 锁的粒度指锁保护资源的范围大小。细粒度锁保护小范围资源,提高并发度;粗粒度锁保护大范围资源,简化编程但可能降低并发度。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 import threadingimport timeimport randomclass BankAccount : def __init__ (self, balance ): self .balance = balance self .coarse_lock = threading.Lock() self .read_lock = threading.Lock() self .write_lock = threading.Lock() def transfer_coarse (account, amount ): with account.coarse_lock: current_balance = account.balance time.sleep(random.uniform(0.001 , 0.005 )) account.balance = current_balance + amount print (f"粗粒度锁: 转账 {amount} ,当前余额 {account.balance} " ) def transfer_fine (account, amount ): with account.read_lock: current_balance = account.balance time.sleep(random.uniform(0.001 , 0.005 )) time.sleep(random.uniform(0.001 , 0.002 )) with account.write_lock: account.balance = current_balance + amount print (f"细粒度锁: 转账 {amount} ,当前余额 {account.balance} " ) def main (): account = BankAccount(1000 ) threads = [] print ("=== 测试粗粒度锁 ===" ) start_time = time.time() for i in range (10 ): amount = random.randint(1 , 100 ) t = threading.Thread(target=transfer_coarse, args=(account, amount)) threads.append(t) t.start() for t in threads: t.join() coarse_time = time.time() - start_time print (f"粗粒度锁总耗时: {coarse_time:.4 f} 秒" ) account.balance = 1000 threads = [] print ("\n=== 测试细粒度锁 ===" ) start_time = time.time() for i in range (10 ): amount = random.randint(1 , 100 ) t = threading.Thread(target=transfer_fine, args=(account, amount)) threads.append(t) t.start() for t in threads: t.join() fine_time = time.time() - start_time print (f"细粒度锁总耗时: {fine_time:.4 f} 秒" ) print (f"\n性能比较: 细粒度锁比粗粒度锁快 {(coarse_time / fine_time):.2 f} 倍" ) if __name__ == '__main__' : main()
锁粒度 优点 缺点 适用场景 粗粒度锁 简单、易维护、不易死锁 并发性能低、可能导致线程等待 简单应用、对性能要求不高的场景 细粒度锁 并发性能高、资源利用率高 实现复杂、可能造成死锁 高性能要求、资源访问模式明确的场景
14.7 消息队列与进程通信 在并发编程中,队列是一种常用的数据结构。它遵循 先进先出(FIFO) 的原则,适合用于线程或进程间的通信,而堆栈则遵循 后进先出(LIFO) 的原则。Python 中的 queue
和 multiprocessing
模块提供了多种类型的队列,每种队列适用于不同的场景。
队列基础知识 Python 的 queue
模块和 multiprocessing
模块提供了多种队列类型,主要包括:
队列类型 模块 特点 适用场景 Queue queue 线程安全的 FIFO 队列 线程间通信 LifoQueue queue 线程安全的 LIFO 队列(堆栈) 需要后进先出的场景 PriorityQueue queue 优先级队列 任务具有优先级的场景 Queue multiprocessing 进程安全的 FIFO 队列 进程间通信 JoinableQueue multiprocessing 带有任务完成通知机制的队列 生产者-消费者模型
队列使用示例 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 import randomfrom multiprocessing import Queuefrom concurrent.futures import ThreadPoolExecutorimport timedef producer (q: Queue ) -> None : """生产者函数,负责生产数据并放入队列""" for i in range (10 ): item = f"小吃{i} " print (f"生产者生产了{item} " ) q.put(item) time.sleep(random.uniform(0.1 , 0.5 )) q.put(None ) print ("生产者结束" ) def consumer (q: Queue ): """消费者函数,负责从队列中获取数据并消费""" while True : item = q.get() if item is None : break print (f"消费者消费了{item} " ) time.sleep(random.uniform(1 , 2 )) print ("消费者结束" ) if __name__ == '__main__' : q = Queue() with ThreadPoolExecutor(max_workers=2 ) as executor: executor.submit(producer, q) executor.submit(consumer, q)
优先级队列示例 优先级队列按任务的优先级顺序处理任务。数字越小优先级越高。以下是如何使用 PriorityQueue
的示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 import queueimport threadingimport timepq = queue.PriorityQueue() def process_tasks (): """按照优先级处理任务""" while True : try : priority,task = pq.get(timeout=3 ) print (f"处理任务:[优先级{priority} ] {task} " ) pq.task_done() except queue.Empty: print ("队列为空,任务处理完毕" ) break pq.put((3 , "普通任务" )) pq.put((1 , "紧急任务" )) pq.put((2 , "中等优先级任务" )) pq.put((1 , "另一个紧急任务" )) pq.put((5 , "低优先级任务" )) worker = threading.Thread(target=process_tasks) worker.start() pq.join() worker.join() print ("所有任务处理完毕" )