Search K
Appearance
Appearance
单线程串行(不加改造的技术) ---> 多线程并发(threading) ---> 多CPU并行(multiprocessing) ---> 多机器并行(hadoop/hive/spark)
一个进程中可以启动N个线程 一个线程中可以启动N个协程
# 1. 准备一个函数
def myFunc(a,b):
print(a,b)
# 2. 创建一个线程
import threading
t = threading.Thread(target=myFunc,args=(100,200))
# 3. 启动线程
t.start()
# 4. 等待 线程任务结束 相当于 await
t.join()很多复杂的事情一般都不会一下子做完,而是分很多中间步骤一步步完成
输入数据 ---> 处理器1 ---> 中间数据 ---> 处理器x ---> 中间数据 ---> 处理器n ---> 输出数据
生产者和消费者 就是一个典型的 pipeline
queue.Queue 可以用于多线程之间、线程安全的数据通信
#1. 导入类库
import queue
#2. 创建Queue
q = queue.Queue()
#3. 添加元素: 它是阻塞的,当q满了,它会卡住,等到里面空出来一个,它才会加进去
q.put(item)
#4. 获取元素:它是阻塞的,当没有元素的时候,它会卡住,等有元素的时候,它才会继续执行
item = q.get()
#5. 查询状态
# 查看元素的多少
q.qsize()
# 判断是否为空
q.empty()
#判断是否已满
q.full()某个函数、函数库在多线程环境中被调用时,能够正确地处理多个线程之间的** **,使程序功能正确完成。由于线程的执行会随时切换,就造成了不可预料的结果
Lock(锁)是 Python 中用于解决线程安全问题的重要机制。它可以确保在同一时刻只有一个线程可以访问共享资源,从而避免数据竞争和不一致性问题。
这种方式需要显式地获取和释放锁:
import threading
lock = threading.Lock()
# 使用锁
lock.acquire()
try:
# do something
pass
finally:
lock.release()这种方式更加简洁和优雅,使用上下文管理器自动处理锁的获取和释放:
import threading
lock = threading.Lock()
# 使用锁
with lock:
# do something
pass推荐使用 with 模式,因为它更加简洁且安全,不会因为忘记释放锁而导致死锁问题。with 语句会确保在代码块执行完毕后自动释放锁,即使代码块中出现异常也能正确释放。
线程池管理着线程的完整生命周期,包含以下状态:
资源管理效率:
任务处理机制:
任务队列 [T T T T T T] <--- 新任务
线程池 {
线程1
线程...
线程N
}提升性能
适用场景
防御功能
代码优势
from concurrent.futures import ThreadPoolExecutor, as_completed
# 方法1:使用map函数(简单方式)
with ThreadPoolExecutor() as pool:
results = pool.map(craw, urls)
for result in results:
print(result)
# 注意:map的结果和输入参数是顺序对应的
# 方法2:使用future模式(更强大)
with ThreadPoolExecutor() as pool:
futures = [pool.submit(craw, url) for url in urls]
# 方式1:按照完成顺序获取结果
for future in futures:
print(future.result())
# 方式2:使用as_completed,谁先完成先处理谁
for future in as_completed(futures):
print(future.result())如果遇到了CPU密集型计算,多线程反而会降低执行速度!这是因为:
GIL的存在
CPU密集型计算的问题
multiprocessing模块就是Python为了解决GIL缺陷引入的一个模块,原理是用多进程在多CPU上并行执行。