news 2026/6/13 21:41:52

Python多线程编程实战指南

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
Python多线程编程实战指南

一、理论基础

1.1 线程与进程的区别

  • 进程:拥有独立的内存空间、数据栈等,系统开销大,进程间通信复杂。

  • 线程:依附于进程,共享进程的内存空间,创建和切换开销小,但需要自己处理共享数据的同步问题。

1.2 Python 的 GIL(全局解释器锁)

  • CPython 解释器中有一个 GIL,同一时刻只允许一个线程执行 Python 字节码

  • 因此,Python 的多线程并不能利用多核 CPU 并行计算(计算密集型任务反而可能变慢)。

  • 适用场景I/O 密集型任务(网络请求、文件读写、用户输入等),线程在等待 I/O 时会释放 GIL,从而真正并发。

1.3 两个线程模块

  • thread(Python 2 中的名字,Python 3 中为_thread):底层、原始,不推荐直接使用。

  • threading:基于_thread封装,提供了更强大的Thread类和各种同步工具,推荐使用


二、实操环境准备

  • Python 3.6+(任何较新版本都可以)

  • 任意代码编辑器(VS Code、PyCharm、甚至记事本)

  • 命令行 / 终端

验证 Python 版本

python --version # 应显示 3.x

三、实操步骤 1:创建并启动线程(两种方式)

3.1 方式一:函数式(使用threading.Thread(target=...)

新建文件thread_func.py,写入以下代码:

import threading import time def print_time(thread_name, delay, counter): """线程函数:每隔 delay 秒打印一次当前时间,重复 counter 次""" while counter > 0: time.sleep(delay) print(f"{thread_name}: {time.ctime()}") counter -= 1 # 创建两个线程 t1 = threading.Thread(target=print_time, args=("Thread-1", 2, 5)) t2 = threading.Thread(target=print_time, args=("Thread-2", 4, 5)) # 启动线程 t1.start() t2.start() # 等待两个线程都结束(否则主线程可能提前退出) t1.join() t2.join() print("所有线程执行完毕,主线程退出。")

运行

python thread_func.py

预期输出(时间会变化,但顺序类似):

Thread-1: Thu Jun 11 15:42:17 2026 Thread-1: Thu Jun 11 15:42:19 2026 Thread-2: Thu Jun 11 15:42:19 2026 Thread-1: Thu Jun 11 15:42:21 2026 Thread-2: Thu Jun 11 15:42:23 2026 ...

说明

  • threading.Thread(target=函数名, args=元组)创建线程对象。

  • start()使线程开始运行(调用run(),默认run()会执行target指定的函数)。

  • join()阻塞主线程,直到被调用的线程结束,避免主线程提前退出。

3.2 方式二:继承threading.Thread

新建文件thread_class.py

import threading import time class MyThread(threading.Thread): def __init__(self, thread_id, name, delay, counter): super().__init__() # 调用父类初始化 self.thread_id = thread_id self.name = name self.delay = delay self.counter = counter def run(self): """线程启动后自动执行的方法""" print(f"开始线程:{self.name}") while self.counter > 0: time.sleep(self.delay) print(f"{self.name}: {time.ctime()}") self.counter -= 1 print(f"退出线程:{self.name}") # 创建线程实例 thread1 = MyThread(1, "Thread-1", 1, 5) thread2 = MyThread(2, "Thread-2", 2, 5) thread1.start() thread2.start() thread1.join() thread2.join() print("主线程结束")

运行

python thread_class.py

效果与函数式类似,但更符合面向对象风格,适合需要保存更多状态或复用的场景。


四、实操步骤 2:线程同步(使用锁)

当多个线程修改同一份数据时,会发生“竞态条件”。下面用两个线程同时对一个全局变量加 1000000 次来演示问题,然后用锁解决。

4.1 无锁的错误示例

新建race_condition.py

import threading # 共享资源 counter = 0 def increment(thread_name, times): global counter for _ in range(times): counter += 1 # 这不是原子操作!可能被线程切换中断 # 创建两个线程,每个加 1000000 次 t1 = threading.Thread(target=increment, args=("A", 1000000)) t2 = threading.Thread(target=increment, args=("B", 1000000)) t1.start() t2.start() t1.join() t2.join() print(f"期望结果: 2000000") print(f"实际结果: {counter}")

运行几次,你会发现结果往往小于 2000000(如 1845214 等),这就是因为线程交替执行时丢失了更新。

4.2 使用锁(threading.Lock)修复

新建with_lock.py

import threading counter = 0 lock = threading.Lock() # 创建一个锁对象 def increment_safe(thread_name, times): global counter for _ in range(times): lock.acquire() # 获取锁(如果已被其他线程持有,则阻塞) # 以下临界区代码同一时刻只有一个线程能执行 counter += 1 lock.release() # 释放锁 # 也可以使用 with 语句(更推荐): def increment_safe_with(thread_name, times): global counter for _ in range(times): with lock: # 自动 acquire 和 release counter += 1 t1 = threading.Thread(target=increment_safe, args=("A", 1000000)) t2 = threading.Thread(target=increment_safe, args=("B", 1000000)) t1.start() t2.start() t1.join() t2.join() print(f"加锁后的结果: {counter}") # 稳定输出 2000000

说明

  • lock.acquire()lock.release()之间的代码称为“临界区”,一次只允许一个线程进入。

  • 使用with lock:更加简洁安全,避免忘记释放锁。


五、实操步骤 3:线程间通信 - 队列(Queue)

queue.Queue是线程安全的 FIFO 队列,非常适合生产者-消费者模式。

示例:三个工人线程处理一个任务队列

新建queue_demo.py

import threading import queue import time import random # 任务队列,最大长度 5 task_queue = queue.Queue(maxsize=5) def producer(name, total_tasks): """生产者:往队列中放任务""" for i in range(total_tasks): task = f"任务-{i}" task_queue.put(task) # 如果队列满,会阻塞直到有空间 print(f"{name} 生产了 {task}") time.sleep(random.uniform(0.2, 0.5)) # 生产结束后发送“毒丸”信号,通知消费者退出(数量等于消费者个数) for _ in range(3): task_queue.put(None) def consumer(name): """消费者:从队列中取任务并处理""" while True: task = task_queue.get() # 如果队列空,会阻塞直到有新任务 if task is None: # 收到毒丸,退出 break print(f"{name} 开始处理 {task}") time.sleep(random.uniform(0.3, 0.7)) # 模拟处理耗时 print(f"{name} 完成 {task}") task_queue.task_done() # 告诉队列该任务已完成 # 创建线程 prod = threading.Thread(target=producer, args=("生产者", 10)) cons1 = threading.Thread(target=consumer, args=("消费者-1",)) cons2 = threading.Thread(target=consumer, args=("消费者-2",)) cons3 = threading.Thread(target=consumer, args=("消费者-3",)) # 启动消费者(先启动,会阻塞等待任务) cons1.start() cons2.start() cons3.start() # 启动生产者 prod.start() # 等待生产者结束 prod.join() # 等待所有任务被处理(队列为空且 task_done 调用次数匹配) task_queue.join() print("所有任务处理完毕")

运行

bash

python queue_demo.py

你会看到三个消费者并发地从队列中取出任务处理,整个过程自动同步,无需手动加锁。

关键方法

  • put(item):放入元素(队列满时阻塞)

  • get():取出元素(队列空时阻塞)

  • task_done():表示之前取出的任务已完成

  • join():阻塞直到队列中所有元素都被task_done()确认


六、实操步骤 4:使用threading的其他常用功能

6.1 获取当前线程信息

import threading def show_info(): t = threading.current_thread() print(f"当前线程: {t.name}, ID: {t.ident}, 是否存活: {t.is_alive()}") t = threading.Thread(target=show_info, name="我的线程") t.start() t.join()

运行结果为:

6.2 线程枚举

print(f"活动线程数: {threading.active_count()}") for t in threading.enumerate(): print(t.name)

6.3 守护线程(Daemon Thread)

主线程结束时,守护线程会自动被终止。适合后台监控任务。

d = threading.Thread(target=some_task, daemon=True) d.start() # 主线程结束后,d 会立即被强制结束

七、注意事项与最佳实践

场景推荐做法
避免使用_thread模块始终使用threadingqueue
共享可变数据使用LockRLockQueue
生产者-消费者模型优先使用queue.Queue,不要自己用锁实现
线程池使用concurrent.futures.ThreadPoolExecutor
计算密集型任务改用multiprocessing模块利用多核 CPU
线程间通信使用QueueEventCondition

八、扩展学习:线程池(ThreadPoolExecutor

对于大量短期任务,频繁创建销毁线程开销大,线程池可以复用线程。

from concurrent.futures import ThreadPoolExecutor import time def task(n): time.sleep(1) return n * n with ThreadPoolExecutor(max_workers=3) as executor: results = executor.map(task, [1, 2, 3, 4, 5]) print(list(results)) # [1, 4, 9, 16, 25]

map方法自动将迭代器的元素分配给线程池中的线程执行,非常方便

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/6/13 21:40:53

16-35岁必看!想学参军加分技能?穿越机拆装才是隐形王牌

很多准备入伍、向往军营的年轻人,都在纠结:没有特长、没有技能、入伍之后会不会没有优势?其实,当下部队青睐和紧缺的,不是花哨证书,而是动手能力强、懂装备原理、适应智能装备岗位的青年。尤其对于16–35岁…

作者头像 李华
网站建设 2026/6/13 21:36:00

保姆级教程:用PFC模拟岩石巴西劈裂试验(从成样到加载完整流程)

从零开始用PFC模拟岩石巴西劈裂试验:完整流程与深度解析巴西劈裂试验作为岩石力学领域的经典测试方法,在评估岩石抗拉强度方面具有不可替代的价值。对于刚接触PFC(Particle Flow Code)的研究人员来说,如何准确复现这一…

作者头像 李华
网站建设 2026/6/13 21:34:57

Python入门:从零开始写你的第一个AI智能体,30分钟搞定!

🎁个人主页:我滴老baby 🎉欢迎大家点赞👍评论📝收藏⭐文章 🔍系列专栏:AI 文章目录:Python入门:从零开始写你的第一个AI智能体,30分钟搞定!一、什…

作者头像 李华
网站建设 2026/6/13 21:34:55

毛利和毛利率到底怎么看?一篇讲透毛利分析的底层逻辑

很多人看生意赚不赚钱,第一反应就是看销售额。这个月卖了100万,比上月多了30万,感觉业绩不错。但真正懂经营的人,不会先问“卖了多少”,而会先问一句:毛利多少?毛利率多少?因为销售额…

作者头像 李华
网站建设 2026/6/13 21:33:55

Linux内核学习17--SPI子系统

1 Linux下SPI子系统简介 关于SPI协议细节,之前写过:总线学习3--SPI_spi a0-CSDN博客 这次就不多写了,这次主要关注Linux下的使用,使用的环境是树莓派5。还是先看图吧,毕竟无图无真相。这个图B格看起来有点高。 不过对…

作者头像 李华
网站建设 2026/6/13 21:27:18

i.MX23引脚复用与GPIO配置实战:从原理到避坑指南

1. 项目概述与核心价值在嵌入式硬件开发中,最基础也最关键的环节之一,就是搞定芯片的引脚。这听起来简单,不就是把线连上吗?但当你面对像i.MX23这样一颗集成了LCD、NAND Flash、多个UART、I2C、PWM等丰富外设的应用处理器时&#…

作者头像 李华