Skip to content

python 并发编程

程序提速的几种粒度

单线程串行(不加改造的技术) ---> 多线程并发(threading) ---> 多CPU并行(multiprocessing) ---> 多机器并行(hadoop/hive/spark)

Python对并发编程的支持

  • 多线程:threading,利用CPU和IO可以同时执行的原理,让CPU不会干巴巴等待IO完成
  • 多进程:multiprocessing,利用多核CPU的能力,真正的并行执行任务
  • 异步IO:asyncio,在单线程利用CPU和IO同时执行的原理,实现函数异步执行
  • 使用Lock对资源加锁,防止冲突访问
  • 使用Queue实现不同线程/进程之间的数据通信,实现生产者-消费者模式
  • 使用线程池Pool/进程池Pool,简化线程/进程的任务提交、等待结束、获取结果
  • 使用subprocess启动外部程序的进程,并进行输入输出交互

python 并发编程有三种方式

一个进程中可以启动N个线程 一个线程中可以启动N个协程

多进程 Process (multiprocessing)

  • 优点:可以利用多核CPU并行运算
  • 缺点:占用资源最多、可启动数目比线程少
  • 适用于:CPU密集型计算

多线程 Thread (threading)

  • 优点:相比进程,更轻量级、占用资源少
  • 缺点:
    • 相比进程:多线程只能并发执行,不能利用多CPU(GIL)
    • 相比协程:启动数目有限制,占用内存资源,有线程切换开销
  • 适用于:IO密集型计算、同时运行的任务数目要求不多

多协程 Coroutine (asyncio)

  • 优点:内存开销最少、启动协程数量最多
  • 缺点:支持的库有限制(aiohttp vs requests)、代码实现复杂
  • 适用于:IO密集型计算、需要超多任务运行、但有现成库支持的场景

多进程、多线程、多协程的技术选型

python速度慢的两大原因

  1. 动态类型语言,边解释边运行
  2. GIL无法利用多核CPU并发执行 tip: GIL 全局解释器锁(Globe Interpreter Lock),是计算机程序设计语言解释器用于同步线程的一种机制,使得任何时刻仅有一个线程在执行。即使在多核心处理器上也只允许同一时间执行一个线程。它的好处是,简化了python对共享资源的管理。

python 创建多线程的方法

python
# 1. 准备一个函数

def myFunc(a,b):
    print(a,b)

# 2. 创建一个线程

import threading

t = threading.Thread(target=myFunc,args=(100,200))

# 3. 启动线程
t.start()

# 4. 等待 线程任务结束 相当于 await
t.join()

python 生产者消费者模式

  • 多组件的Pipeline技术架构
  • 多线程数据通信的queue.Queue

多组件的Pipeline技术架构

很多复杂的事情一般都不会一下子做完,而是分很多中间步骤一步步完成

输入数据 ---> 处理器1 ---> 中间数据 ---> 处理器x ---> 中间数据 ---> 处理器n ---> 输出数据

生产者和消费者 就是一个典型的 pipeline

多线程数据通信的 queue.Queue

queue.Queue 可以用于多线程之间、线程安全的数据通信

python
#1. 导入类库
import queue

#2. 创建Queue
q = queue.Queue()

#3. 添加元素: 它是阻塞的,当q满了,它会卡住,等到里面空出来一个,它才会加进去

q.put(item)

#4. 获取元素:它是阻塞的,当没有元素的时候,它会卡住,等有元素的时候,它才会继续执行

item = q.get()

#5. 查询状态
# 查看元素的多少
q.qsize()
# 判断是否为空
q.empty()
#判断是否已满
q.full()

线程安全

某个函数、函数库在多线程环境中被调用时,能够正确地处理多个线程之间的** **,使程序功能正确完成。由于线程的执行会随时切换,就造成了不可预料的结果

Lock 用于解决线程安全问题

Lock(锁)是 Python 中用于解决线程安全问题的重要机制。它可以确保在同一时刻只有一个线程可以访问共享资源,从而避免数据竞争和不一致性问题。

使用 Lock 的两种方式

1. try-finally 模式

这种方式需要显式地获取和释放锁:

python
import threading

lock = threading.Lock()

# 使用锁
lock.acquire()
try:
    # do something
    pass
finally:
    lock.release()

2. with 模式

这种方式更加简洁和优雅,使用上下文管理器自动处理锁的获取和释放:

python
import threading

lock = threading.Lock()

# 使用锁
with lock:
    # do something
    pass

推荐使用 with 模式,因为它更加简洁且安全,不会因为忘记释放锁而导致死锁问题。with 语句会确保在代码块执行完毕后自动释放锁,即使代码块中出现异常也能正确释放。

线程池的原理

线程的生命周期

线程池管理着线程的完整生命周期,包含以下状态:

  1. 新建(start):创建新线程,系统需要分配资源
  2. 就绪:等待获取CPU资源
  3. 运行:获得CPU资源后执行run方法
    • 运行完成后进入终止状态
    • 遇到sleep/io操作时进入阻塞状态
  4. 阻塞:sleep/io操作结束后重新回到就绪状态
  5. 终止:run方法执行完成,系统回收资源

为什么需要线程池?

  1. 资源管理效率

    • 新建线程系统需要分配资源
    • 终止线程系统需要回收资源
    • 如果可以重用线程,就可以减少新建/终止的开销
  2. 任务处理机制

    • 线程池维护一个任务队列
    • 新任务会被加入到任务队列中
    • 线程池中的线程(通常称为工作线程)会不断从任务队列中获取任务并执行

线程池的结构

任务队列 [T T T T T T] <--- 新任务

线程池 {
    线程1
    线程...
    线程N
}

使用线程池的好处

  1. 提升性能

    • 减少了大量新建、终止线程的开销
    • 实现了线程资源的重用
  2. 适用场景

    • 适合处理突发性大量请求
    • 适合需要大量线程完成任务,但实际任务处理时间较短的场景
  3. 防御功能

    • 能有效避免系统因为创建线程过多而导致系统负荷过大
    • 防止系统响应变慢等问题
  4. 代码优势

    • 使用线程池的语法比自己新建线程执行线程更加简洁
    • 提供了更好的代码可维护性

ThreadPoolExecutor的使用方法

python
from concurrent.futures import ThreadPoolExecutor, as_completed

# 方法1:使用map函数(简单方式)
with ThreadPoolExecutor() as pool:
    results = pool.map(craw, urls)
    for result in results:
        print(result)
    # 注意:map的结果和输入参数是顺序对应的

# 方法2:使用future模式(更强大)
with ThreadPoolExecutor() as pool:
    futures = [pool.submit(craw, url) for url in urls]
    
    # 方式1:按照完成顺序获取结果
    for future in futures:
        print(future.result())
    
    # 方式2:使用as_completed,谁先完成先处理谁
    for future in as_completed(futures):
        print(future.result())

为什么需要多进程(multiprocessing)

CPU密集型计算的局限性

如果遇到了CPU密集型计算,多线程反而会降低执行速度!这是因为:

  1. GIL的存在

    • 虽然有全局解释器锁GIL
    • 但是因为有IO的存在,多线程依然可以加速运行
    • 这仅适用于IO密集型任务
  2. CPU密集型计算的问题

    • 线程的自动切换反而变成了负担
    • 多线程甚至会减慢运行速度

multiprocessing的引入

multiprocessing模块就是Python为了解决GIL缺陷引入的一个模块,原理是用多进程在多CPU上并行执行。