1. 什么是进程,线程
进程(process)
- 进程是正在运行的程序的实例
- 操作系统利用进程把工作划分为一些功能单元
- 操作系统加载程序,以进程的方式在操作系统中运行它,并分配了系统资源给进程(内存等)
线程(thread)
- 进程中所包含的一个或多个执行单元称为线程(thread),线程是 CPU 调度和执行的基本单位
- 操作系统创建一个进程后,该进程会有一个主线程,主线程退出进程就会退出,并释放进程占用的资源
- 一个线程可以创建另一个线程,同一个进程中的多个线程之间可以并发执行
理解进程线程关系,公司与员工 <==> 进程与线程
- 公司本身是进程,它包含很多人,每个人是一个线程,还有资源打印机,会议室,资金等
- 真正执行工作的是线程(公司员工),公司的业务状态是靠人来执行的
- 这些人可以并行工作,处理事情
- 当大家访问公共资源的时候会有冲突,例如使用相同的会议室,打卡,这就需要队列排队,预约上锁
但是与现实模型不同的是
,这些人是有几个 cpu 控制的,例如 8 个 cpu 对应 80 个人,cpu 需要切换控制- 线程遇到阻塞的时候,cpu 就需要切换执行另一个线程,避免等待
2. python 多线程
- python 带有 threading 库,支持开发多线程程序
用对象实现多线程示例
import timeimport threadingdef main(): # 线程函数 def thr_func(data): ''' 循环打印,打印后暂停1秒 ''' for i in range(2): print(data, i) time.sleep(1) # 创建 3 个线程 # 创建第一个线程 one_thr = threading.Thread(target=thr_func, args=['one thr']) # 开始执行线程 one_thr.start() # 创建执行第二个线程 two_thr = threading.Thread(target=thr_func, args=['two thr']) two_thr.start() # 创建执行第三个线程 three_thr = threading.Thread(target=thr_func, args=['three thr']) three_thr.start() # 调用 join 等待线程结束 one_thr.join() two_thr.join() three_thr.join() # 创建 10 个线程 thrs = [threading.Thread(target=thr_func, args=['hello world thr: %d' % i]) for i in range(10)] # 开始执行线程 [thr.start() for thr in thrs] # 等待线程结束 [thr.join() for thr in thrs]if __name__ == '__main__': main()
运行程序,获取输出结果
# 运行python test_multi.py# 输出one thr 0two thr 0three thr 0hello world thr: 0 0hello world thr: 1 0hello world thr: 2 0hello world thr: 3 0hello world thr: 4 0hello world thr: 5 0hello world thr: 6 0hello world thr: 7 0hello world thr: 8 0hello world thr: 9 0In [54]: two thr 1three thr 1one thr 1hello world thr: 0 1hello world thr: 3 1hello world thr: 4 1hello world thr: 2 1hello world thr: 5 1hello world thr: 1 1hello world thr: 6 1hello world thr: 8 1hello world thr: 9 1hello world thr: 7 1
用继承实现线程函数
import timeimport threadingclass MyThread(threading.Thread): def __init__(self, myname, count): self._myname = myname self._count = count super().__init__() def run(self): """ 实现线程函数,start 后被调用 """ for i in range(self._count): print(self._myname, ',', i) time.sleep(0.5)if __name__ == '__main__': thr = MyThread('hello', 10) thr1 = MyThread('hello', 10) thr.start() thr1.start() # join 等待线程退出 thr.join() thr1.join()
3. 线程并发问题
多线程并发访问资源的时候,会有冲突,导致问题
用锁(lock)处理并发资源访问
下面是有问题的示例代码
import timeimport threadingimport randomdef concur_code(lock): counter = 0 def thr_func(): ''' 增加统计数值 ''' nonlocal counter for i in range(10): if lock: # 使用加锁的代码,同一时刻只能一个线程获取锁,进入运行 with lock: tmp_counter = counter time.sleep(0.01) tmp_counter += 1 time.sleep(0.01) counter = tmp_counter else: # 不加锁的时候会这里会出现冲突,线程间赋值会重复 tmp_counter = counter time.sleep(0.01) tmp_counter += 1 counter = tmp_counter # 创建 10 个线程 thrs = [threading.Thread(target=thr_func) for i in range(10)] [thr.start() for thr in thrs] [thr.join() for thr in thrs] print('all count: %d' % counter)if __name__ == '__main__': # lock = threading.Lock() lock = False concur_code(lock)
运行结果错误,本来应该10*10=100,但结果只有10
# 运行结果错误,本来应该10*10=100,但结果只有10python test_multi.pyall count: 10
用锁处理资源竞争
修改代码,使用线程锁
# 修改添加锁if __name__ == '__main__': lock = threading.Lock() concur_code(lock)
运行修改后代码
# 运行正常python test_multi.pyall count: 100
4. 用队列在线程间传递数据,通过队列在线程间分发处理事务
- Queue 是一个 FIFO 先进先出队列,可以放置任意类型的 python 对象
- 队列对象自动获取线程锁,释放锁,是线程安全的
- 任意时刻只有一个线程可以修改队列
- 可以用来在做线程间传递数据
下面是多个生产者,发送信息到 queue,多个消费者从 queue 获取信息使用的示例
import timeimport threadingimport queueimport randomimport itertoolsdef use_queue(): # 数据队列 dataQueue = queue.Queue() lock = threading.Lock() # 消费者数量 num_consumers = 2 # 生产者数量 num_producers = 4 # 生产者产生的消息数量 num_msges = 4 counter = 0 # 生产者函数 def producer(id_prod): for i in range(num_msges): time.sleep(id_prod) dataQueue.put('producer id={}, count={}'.format(id_prod, i)) # 消费者函数 def consumer(id_consumer): while True: time.sleep(0.1) try: data = dataQueue.get(block=False) except queue.Empty: if counter == num_producers * num_msges: return else: pass else: with lock: nonlocal counter counter += 1 print('consumer {}, total={}, got => {}' .format(id_consumer, counter, data)) # 创建消费线程 thr_consumers = [threading.Thread(target=consumer, args=(i,)) for i in range(num_consumers)] # 创建生产线程 thr_producers = [threading.Thread(target=producer, args=(i,)) for i in range(num_producers)] # 开始执行线程 [thr.start() for thr in itertools.chain(thr_consumers, thr_producers)] # 等待线程执行结束 [thr.join() for thr in itertools.chain(thr_consumers, thr_producers)]if __name__ == '__main__': use_queue()